diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b2d59fc..f19dc38d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -62,6 +62,7 @@ if(OPENDAQ_MQTT_MODULE_ENABLE_SSL) endif() add_subdirectory(external) +add_subdirectory(helper_utils) add_subdirectory(mqtt_streaming_protocol) add_subdirectory(mqtt_streaming_module) diff --git a/examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp b/examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp index 1de10286..5104143b 100644 --- a/examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp +++ b/examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp @@ -9,7 +9,7 @@ using namespace daq; struct ConfigStruct { std::string brokerAddress; - std::vector topics; + std::string topic; bool exit = true; int error = 0; }; @@ -20,7 +20,7 @@ ConfigStruct StartUp(int argc, char* argv[]) InputArgs args; args.addArg("--help", "Show help message"); args.addArg("--address", "MQTT broker address", true); - args.setUsageHelp(APP_NAME " [options] ... "); + args.setUsageHelp(APP_NAME " [options] "); args.parse(argc, argv); if (args.hasArg("--help") || args.hasUnknownArgs()) @@ -31,14 +31,14 @@ ConfigStruct StartUp(int argc, char* argv[]) } config.brokerAddress = args.getArgValue("--address", "127.0.0.1"); - config.topics = args.getPositionalArgs(); - - if (config.topics.empty()) + const auto positionalArgs = args.getPositionalArgs(); + if (positionalArgs.empty()) { - std::cout << "MQTT topics are required." << std::endl; + std::cout << "An MQTT topic is required." << std::endl; config.error = -1; return config; } + config.topic = args.getPositionalArgs()[0];; config.exit = false; return config; @@ -66,46 +66,34 @@ int main(int argc, char* argv[]) // Create RAW function block configuration auto config = availableFbs.get(fbName).createDefaultConfig(); - auto topicList = List(); - for (auto& topic : appConfig.topics) - { - addToList(topicList, std::move(topic)); - } - config.setPropertyValue("SignalList", topicList); + config.setPropertyValue("Topic", appConfig.topic); // Add the RAW function block to the broker FB daq::FunctionBlockPtr rawFb = brokerFB.addFunctionBlock(fbName, config); - // Create packet readers for all signals - const auto signals = rawFb.getSignals(); - std::map readers; - for (const auto& s : signals) - { - readers.emplace(std::pair(s.getName().toStdString(), daq::PacketReader(s))); - } + // Create packet readers for a signal + const auto signal = rawFb.getSignals()[0]; + PacketReaderPtr reader = daq::PacketReader(signal); - // Start a thread to read packets from the readers + // Start a thread to read packets from the reader std::atomic running = true; std::thread readerThread( - [&readers, &running]() + [&reader, &signal, &running]() { while (running) { - for (const auto& [signalName, reader] : readers) + while (!reader.getEmpty() && running) { - while (!reader.getEmpty() && running) + auto packet = reader.read(); + if (packet.getType() == PacketType::Event) + { + continue; + } + else if (packet.getType() == PacketType::Data) { - auto packet = reader.read(); - if (packet.getType() == PacketType::Event) - { - continue; - } - else if (packet.getType() == PacketType::Data) - { - const auto dataPacket = packet.asPtr(); - std::string dataStr(static_cast(dataPacket.getData()), dataPacket.getDataSize()); - std::cout << signalName << " - " << dataStr << std::endl; - } + const auto dataPacket = packet.asPtr(); + std::string dataStr(static_cast(dataPacket.getData()), dataPacket.getDataSize()); + std::cout << signal.getName() << " - " << dataStr << std::endl; } } std::this_thread::sleep_for(std::chrono::milliseconds(20)); diff --git a/helper_utils/CMakeLists.txt b/helper_utils/CMakeLists.txt new file mode 100644 index 00000000..91ce7abf --- /dev/null +++ b/helper_utils/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.10) + +set(HELPER_LIB mqtt_streaming_helper) + +set(HELPER_PRJ_VERSION "1.0.0") +set(HELPER_PRJ_NAME "HelperUtils") + +project(${HELPER_PRJ_NAME} VERSION ${HELPER_PRJ_VERSION} LANGUAGES C CXX) + + + +add_library(${HELPER_LIB} INTERFACE) + +target_include_directories(${HELPER_LIB} INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include +) + +target_sources(${HELPER_LIB} INTERFACE + ${CMAKE_CURRENT_SOURCE_DIR}/include/mqtt_streaming_helper/timer.h +) diff --git a/mqtt_streaming_protocol/tests/Timer.h b/helper_utils/include/mqtt_streaming_helper/timer.h similarity index 61% rename from mqtt_streaming_protocol/tests/Timer.h rename to helper_utils/include/mqtt_streaming_helper/timer.h index 26eb3fa0..fe103965 100644 --- a/mqtt_streaming_protocol/tests/Timer.h +++ b/helper_utils/include/mqtt_streaming_helper/timer.h @@ -2,14 +2,19 @@ #include +namespace helper::utils +{ class Timer { public: - Timer(int ms) + Timer(size_t ms, bool start = true) + : period(ms), + firstStart(true) { - start = std::chrono::steady_clock::now(); - timeout = std::chrono::milliseconds(ms); + if (start) + restart(); } + std::chrono::milliseconds remain() const { auto now = std::chrono::steady_clock::now(); @@ -17,16 +22,29 @@ class Timer std::chrono::milliseconds newTout = (elapsed_ms >= timeout) ? std::chrono::milliseconds(0) : timeout - elapsed_ms; return newTout; } + bool expired() { - return remain() == std::chrono::milliseconds(0); + return (firstStart) ? true : (remain() == std::chrono::milliseconds(0)); } + explicit operator std::chrono::milliseconds() const noexcept { return remain(); } + void restart() + { + firstStart = false; + start = std::chrono::steady_clock::now(); + timeout = std::chrono::milliseconds(period); + } + protected: std::chrono::steady_clock::time_point start; std::chrono::milliseconds timeout; + std::chrono::milliseconds period; + bool firstStart; }; + +} // namespace helper::utils diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/constants.h b/mqtt_streaming_module/include/mqtt_streaming_module/constants.h index 96cabef2..9e71e907 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/constants.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/constants.h @@ -17,6 +17,7 @@ static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms static constexpr uint32_t DEFAULT_PUB_QOS = 1; static constexpr uint32_t DEFAULT_PUB_PACK_SIZE = 1; +static constexpr const char* DEFAULT_PUB_TOPIC_NAME = ""; static constexpr const char* PROPERTY_NAME_MQTT_BROKER_ADDRESS = "MqttBrokerAddress"; static constexpr const char* PROPERTY_NAME_MQTT_BROKER_PORT = "MqttBrokerPort"; @@ -24,8 +25,10 @@ static constexpr const char* PROPERTY_NAME_MQTT_USERNAME = "MqttUsername"; static constexpr const char* PROPERTY_NAME_MQTT_PASSWORD = "MqttPassword"; static constexpr const char* PROPERTY_NAME_CONNECT_TIMEOUT = "ConnectTimeout"; static constexpr const char* PROPERTY_NAME_SIGNAL_LIST = "SignalList"; +static constexpr const char* PROPERTY_NAME_TOPIC = "Topic"; static constexpr const char* PROPERTY_NAME_PUB_TOPIC_MODE = "TopicMode"; +static constexpr const char* PROPERTY_NAME_PUB_TOPIC_NAME = "Topic"; static constexpr const char* PROPERTY_NAME_PUB_SHARED_TS = "SharedTimestamp"; static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES = "GroupValues"; static constexpr const char* PROPERTY_NAME_PUB_USE_SIGNAL_NAMES = "UseSignalNames"; @@ -40,8 +43,16 @@ static constexpr const char* ROOT_FB_NAME = "@rootMqttFb"; static const char* MQTT_LOCAL_ROOT_FB_ID_PREFIX = "rootMqttFb"; static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb"; +static const char* MQTT_LOCAL_RAW_FB_ID_PREFIX = "rawMqttFb"; +static const char* MQTT_LOCAL_JSON_FB_ID_PREFIX = "jsonMqttFb"; static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType"; +static const char* MQTT_RAW_FB_SUB_STATUS_TYPE = "MqttSubscriptionStatusType"; +static const char* MQTT_PUB_FB_SIG_STATUS_TYPE = "MqttSignalStatusType"; +static const char* MQTT_PUB_FB_PUB_STATUS_TYPE = "MqttPublishingStatusType"; + +static const char* MQTT_PUB_FB_SIG_STATUS_NAME = "SignalStatus"; +static const char* MQTT_PUB_FB_PUB_STATUS_NAME = "PublishingStatus"; END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/handler_factory.h b/mqtt_streaming_module/include/mqtt_streaming_module/handler_factory.h index e2720a0c..d3724952 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/handler_factory.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/handler_factory.h @@ -31,7 +31,8 @@ class HandlerFactory { if (config.sharedTs) { - return std::make_unique(config.useSignalNames, publisherFbGlobalId); + return std::make_unique(config.useSignalNames, + config.topicName.empty() ? publisherFbGlobalId : config.topicName); } else if (config.topicMode == TopicMode::Single) { @@ -42,7 +43,8 @@ class HandlerFactory } else if (config.topicMode == TopicMode::Multi) { - return std::make_unique(config.useSignalNames, publisherFbGlobalId); + return std::make_unique(config.useSignalNames, + config.topicName.empty() ? publisherFbGlobalId : config.topicName); } return std::make_unique(config.useSignalNames); diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h index 0ed22032..e9ca380c 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h @@ -25,6 +25,29 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE class MqttBaseFb : public FunctionBlock { public: + enum class SubscriptionStatus : EnumType + { + InvalidTopicName = 0, + SubscribingError, + WaitingForData, + HasData + }; + + struct CmdResult + { + bool success = false; + std::string msg; + int token = 0; + + CmdResult(bool success = false, const std::string& msg = "", int token = 0) + : success(success), + msg(msg), + token(token) + { + } + }; + + explicit MqttBaseFb(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, @@ -34,7 +57,10 @@ class MqttBaseFb : public FunctionBlock ~MqttBaseFb() = default; protected: + static std::vector> subscriptionStatusMap; + std::shared_ptr subscriber; + EnumerationPtr subscriptionStatus; virtual void createSignals() = 0; virtual void processMessage(const mqtt::MqttMessage& msg) = 0; @@ -44,12 +70,17 @@ class MqttBaseFb : public FunctionBlock void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg); - virtual std::vector getSubscribedTopics() const = 0; - virtual void clearSubscribedTopics() = 0; - virtual void subscribeToTopics(); - virtual void unsubscribeFromTopics(); + virtual std::string getSubscribedTopic() const = 0; + virtual void clearSubscribedTopic() = 0; + virtual CmdResult subscribeToTopic(); + virtual CmdResult unsubscribeFromTopic(); + + virtual void propertyChanged() = 0; void removed() override; + + void initSubscriptionStatus(); + void setSubscriptionStatus(const SubscriptionStatus status, std::string message = ""); }; END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h index 7416fff3..0585ff13 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h @@ -31,7 +31,6 @@ class MqttJsonReceiverFbImpl final : public MqttBaseFb explicit MqttJsonReceiverFbImpl(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, - const StringPtr& localId, std::shared_ptr subscriber, const PropertyObjectPtr& config = nullptr); ~MqttJsonReceiverFbImpl() override; @@ -42,14 +41,19 @@ class MqttJsonReceiverFbImpl final : public MqttBaseFb mutable std::mutex sync; mqtt::MqttDataWrapper jsonDataWorker; std::unordered_map outputSignals; - std::vector signalIdList; - std::unordered_map subscribedSignals; + std::vector signalNameList; + std::unordered_map subscribedSignals; + std::string topicForSubscribing; + static std::atomic localIndex; + + static std::string getLocalId(); void createSignals() override; - void clearSubscribedTopics() override; - std::vector getSubscribedTopics() const override; + void clearSubscribedTopic() override; + std::string getSubscribedTopic() const override; void processMessage(const mqtt::MqttMessage& msg) override; void readProperties() override; + void propertyChanged() override; void createDataPacket(const std::string& topic, const std::string& json); }; diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h index 5b2ddded..5c13e8f5 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h @@ -18,6 +18,7 @@ #include "MqttAsyncClient.h" #include "MqttDataWrapper.h" #include "mqtt_streaming_module/handler_base.h" +#include "mqtt_streaming_helper/timer.h" #include #include #include @@ -27,6 +28,19 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE class MqttPublisherFbImpl final : public FunctionBlock { public: + enum class SignalStatus : EnumType + { + NotConnected = 0, + Invalid, + Valid + }; + + enum class PublishingStatus : EnumType + { + Ok = 0, + SampleSkipped + }; + explicit MqttPublisherFbImpl(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, @@ -40,7 +54,12 @@ class MqttPublisherFbImpl final : public FunctionBlock void onConnected(const InputPortPtr& port) override; void onDisconnected(const InputPortPtr& port) override; + static void addTypesToTypeManager(daq::TypeManagerPtr manager); + private: + static const std::vector> signalStatusMap; + static const std::vector> publishingStatusMap; + static std::atomic localIndex; std::shared_ptr mqttClient; mqtt::MqttDataWrapper jsonDataWorker; @@ -51,10 +70,20 @@ class MqttPublisherFbImpl final : public FunctionBlock std::atomic running; std::atomic hasError; std::unique_ptr handler; + EnumerationPtr signalStatus; + EnumerationPtr publishingStatus; + uint64_t skippedMsgCnt; + uint64_t publishedMsgCnt; + std::string lastSkippedReason; + helper::utils::Timer publishingStatusTimer; static std::string getLocalId(); + void setSignalStatus(const SignalStatus status, std::string message = "", bool init = false); + void setPublishingStatus(const PublishingStatus status, std::string message = "", bool init = false); + void updatePublishingStatus(); void initProperties(const PropertyObjectPtr& config); void readProperties(); + void propertyChanged(); void updateInputPorts(); void validateInputPorts(); template diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h index 4c0ead3a..a1352be3 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h @@ -30,7 +30,6 @@ class MqttRawReceiverFbImpl final : public MqttBaseFb explicit MqttRawReceiverFbImpl(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, - const StringPtr& localId, std::shared_ptr subscriber, const PropertyObjectPtr& config = nullptr); ~MqttRawReceiverFbImpl() override; @@ -38,14 +37,18 @@ class MqttRawReceiverFbImpl final : public MqttBaseFb static FunctionBlockTypePtr CreateType(); private: std::mutex sync; - std::unordered_map outputSignals; - std::vector topicsForSubscribing; + SignalConfigPtr outputSignal; + std::string topicForSubscribing; + static std::atomic localIndex; + + static std::string getLocalId(); void createSignals() override; - void clearSubscribedTopics() override; - std::vector getSubscribedTopics() const override; + void clearSubscribedTopic() override; + std::string getSubscribedTopic() const override; void processMessage(const mqtt::MqttMessage& msg) override; void readProperties() override; + void propertyChanged() override; }; END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/include/mqtt_streaming_module/types.h b/mqtt_streaming_module/include/mqtt_streaming_module/types.h index d9176b9e..0f758ff6 100644 --- a/mqtt_streaming_module/include/mqtt_streaming_module/types.h +++ b/mqtt_streaming_module/include/mqtt_streaming_module/types.h @@ -19,6 +19,7 @@ enum class TopicMode { struct PublisherFbConfig { TopicMode topicMode; + std::string topicName; bool sharedTs; bool groupValues; bool useSignalNames; diff --git a/mqtt_streaming_module/src/CMakeLists.txt b/mqtt_streaming_module/src/CMakeLists.txt index 1333d75a..a878872d 100644 --- a/mqtt_streaming_module/src/CMakeLists.txt +++ b/mqtt_streaming_module/src/CMakeLists.txt @@ -86,6 +86,7 @@ endif() target_link_libraries(${LIB_NAME} PUBLIC daq::opendaq mqtt_streaming_protocol + mqtt_streaming_helper PRIVATE $ ) diff --git a/mqtt_streaming_module/src/mqtt_base_fb.cpp b/mqtt_streaming_module/src/mqtt_base_fb.cpp index 9fca412a..42618967 100644 --- a/mqtt_streaming_module/src/mqtt_base_fb.cpp +++ b/mqtt_streaming_module/src/mqtt_base_fb.cpp @@ -1,9 +1,16 @@ +#include "mqtt_streaming_module/constants.h" #include BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE constexpr int MQTT_FB_UNSUBSCRIBE_TOUT = 3000; +std::vector> MqttBaseFb::subscriptionStatusMap = + {{SubscriptionStatus::InvalidTopicName, "InvalidTopicName"}, + {SubscriptionStatus::SubscribingError, "SubscribingError"}, + {SubscriptionStatus::WaitingForData, "WaitingForData"}, + {SubscriptionStatus::HasData, "HasData"}}; + MqttBaseFb::MqttBaseFb(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, @@ -14,13 +21,15 @@ MqttBaseFb::MqttBaseFb(const ContextPtr& ctx, , subscriber(subscriber) { initComponentStatus(); + initSubscriptionStatus(); } - void MqttBaseFb::removed() { FunctionBlock::removed(); - unsubscribeFromTopics(); + unsubscribeFromTopic(); + setSubscriptionStatus(SubscriptionStatus::InvalidTopicName, "Function block removed"); + setComponentStatusWithMessage(ComponentStatus::Error, "Function block removed"); } void MqttBaseFb::onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg) @@ -38,6 +47,8 @@ void MqttBaseFb::initProperties(const PropertyObjectPtr& config) if (const auto internalProp = prop.asPtrOrNull(true); internalProp.assigned()) { objPtr.addProperty(internalProp.clone()); + objPtr.getOnPropertyValueWrite(prop.getName()) += + [this](PropertyObjectPtr& obj, PropertyValueEventArgsPtr& args) { propertyChanged(); }; } } objPtr.setPropertyValue(propName, prop.getValue()); @@ -45,64 +56,111 @@ void MqttBaseFb::initProperties(const PropertyObjectPtr& config) readProperties(); } -void MqttBaseFb::subscribeToTopics() +MqttBaseFb::CmdResult MqttBaseFb::subscribeToTopic() { + MqttBaseFb::CmdResult result{false}; if (subscriber) { - bool success = true; auto lambda = [this](const mqtt::MqttAsyncClient &client, mqtt::MqttMessage &msg){this->onSignalsMessage(client, msg);}; - if (!getSubscribedTopics().empty()) - { - LOG_I("Trying to subscribe to the topics"); - } - for (const auto& topic : getSubscribedTopics()) + const auto topic = getSubscribedTopic(); + if (!topic.empty()) { + LOG_I("Trying to subscribe to the topic : {}", topic); subscriber->setMessageArrivedCb(topic, lambda); - auto result = subscriber->subscribe(topic, 1); - success &= result.success; - if (!result.success) + if (auto subRes = subscriber->subscribe(topic, 1); subRes.success == false) { - LOG_W("Failed to subscribe to the topic: {}; reason: {}", topic, result.msg); + LOG_W("Failed to subscribe to the topic: \"{}\"; reason: {}", topic, subRes.msg); + setComponentStatusWithMessage(ComponentStatus::Warning, "Some topics failed to subscribe!"); + setSubscriptionStatus(SubscriptionStatus::SubscribingError, "The reason: " + subRes.msg); + result = {false, "Failed to subscribe to the topic: \"" + topic + "\"; reason: " + subRes.msg}; } else { // subscriber->subscribe(...) is asynchronous. It puts command in queue and returns immediately. LOG_D("Trying to subscribe to the topic: {}", topic); + setComponentStatus(ComponentStatus::Ok); + setSubscriptionStatus(SubscriptionStatus::WaitingForData, "Topic: " + topic); + result = {true, "", result.token}; } } - if (!success) - setComponentStatusWithMessage(ComponentStatus::Warning, "Some topics failed to subscribe!"); + else + { + result = {false, "Couldn't subscribe to an empty topic"}; + } } else { - setComponentStatusWithMessage(ComponentStatus::Error, "MQTT subscriber client is not set!"); + const std::string msg = "MQTT subscriber client is not set!"; + setComponentStatusWithMessage(ComponentStatus::Error, msg); + setSubscriptionStatus(SubscriptionStatus::SubscribingError, msg); + result = {false, msg}; } + return result; } -void MqttBaseFb::unsubscribeFromTopics() +MqttBaseFb::CmdResult MqttBaseFb::unsubscribeFromTopic() { - if (!subscriber) + MqttBaseFb::CmdResult result{true}; + if (subscriber) { - LOG_E("The subscriber is null"); - return; - } - const auto topics = getSubscribedTopics(); - if (topics.empty()) - return; - subscriber->setMessageArrivedCb(topics, nullptr); - auto result = subscriber->unsubscribe(topics); - if (result.success) - result = subscriber->waitForCompletion(result.token, MQTT_FB_UNSUBSCRIBE_TOUT); + const auto topic = getSubscribedTopic(); + if (!topic.empty()) + { + subscriber->setMessageArrivedCb(topic, nullptr); + mqtt::CmdResult unsubRes = subscriber->unsubscribe(topic); + if (unsubRes.success) + unsubRes = subscriber->waitForCompletion(unsubRes.token, MQTT_FB_UNSUBSCRIBE_TOUT); - if (result.success) - { - clearSubscribedTopics(); - LOG_I("All topics have been unsubscribed successfully"); + if (unsubRes.success) + { + clearSubscribedTopic(); + LOG_I("The topic \'{}\' has been unsubscribed successfully", topic); + result = {true}; + } + else + { + const auto msg = fmt::format("Failed to unsubscribe from the topic \'{}\'; reason: {}", topic, unsubRes.msg); + LOG_W("{}", msg); + setComponentStatus(ComponentStatus::Warning); + setSubscriptionStatus(SubscriptionStatus::SubscribingError, msg); + result = {false, msg}; + } + } } else { - LOG_W("Failed to unsubscribe from all topics; reason: {}", result.msg); + const std::string msg = "MQTT subscriber client is not set!"; + setComponentStatusWithMessage(ComponentStatus::Error, msg); + setSubscriptionStatus(SubscriptionStatus::SubscribingError, msg); + result = {false, msg}; } + return result; +} + +void MqttBaseFb::initSubscriptionStatus() +{ + if (!context.getTypeManager().hasType(MQTT_RAW_FB_SUB_STATUS_TYPE)) + { + auto list = List(); + for (const auto& [_, st] : subscriptionStatusMap) + list.pushBack(st); + + context.getTypeManager().addType(EnumerationType(MQTT_RAW_FB_SUB_STATUS_TYPE, list)); + } + + subscriptionStatus = EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, + static_cast(SubscriptionStatus::InvalidTopicName), + this->context.getTypeManager()); + statusContainer.template asPtr(true).addStatus("SubscriptionStatus", + subscriptionStatus); +} + +void MqttBaseFb::setSubscriptionStatus(const SubscriptionStatus status, std::string message) +{ + subscriptionStatus = EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, static_cast(status), this->context.getTypeManager()); + statusContainer.template asPtr(true).setStatusWithMessage("SubscriptionStatus", + subscriptionStatus, + message); } END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/src/mqtt_json_receiver_fb_impl.cpp b/mqtt_streaming_module/src/mqtt_json_receiver_fb_impl.cpp index 1aac524b..fe4dd390 100644 --- a/mqtt_streaming_module/src/mqtt_json_receiver_fb_impl.cpp +++ b/mqtt_streaming_module/src/mqtt_json_receiver_fb_impl.cpp @@ -2,19 +2,19 @@ #include #include #include -#include BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE constexpr int MQTT_JSON_FB_UNSUBSCRIBE_TOUT = 3000; +std::atomic MqttJsonReceiverFbImpl::localIndex = 0; + MqttJsonReceiverFbImpl::MqttJsonReceiverFbImpl(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, - const StringPtr& localId, std::shared_ptr subscriber, const PropertyObjectPtr& config) - : MqttBaseFb(ctx, parent, type, localId, subscriber, config), + : MqttBaseFb(ctx, parent, type, getLocalId(), subscriber, config), jsonDataWorker(loggerComponent) { if (config.assigned()) @@ -23,12 +23,12 @@ MqttJsonReceiverFbImpl::MqttJsonReceiverFbImpl(const ContextPtr& ctx, initProperties(type.createDefaultConfig()); createSignals(); - subscribeToTopics(); + subscribeToTopic(); } MqttJsonReceiverFbImpl::~MqttJsonReceiverFbImpl() { - unsubscribeFromTopics(); + unsubscribeFromTopic(); } FunctionBlockTypePtr MqttJsonReceiverFbImpl::CreateType() @@ -47,26 +47,47 @@ FunctionBlockTypePtr MqttJsonReceiverFbImpl::CreateType() return fbType; } +std::string MqttJsonReceiverFbImpl::getLocalId() +{ + return std::string(MQTT_LOCAL_JSON_FB_ID_PREFIX + std::to_string(localIndex++)); +} + void MqttJsonReceiverFbImpl::readProperties() { - auto lock = std::scoped_lock(sync); + auto lock = std::lock_guard(sync); subscribedSignals.clear(); - signalIdList.clear(); + signalNameList.clear(); + topicForSubscribing.clear(); bool isPresent = false; if (objPtr.hasProperty(PROPERTY_NAME_SIGNAL_LIST)) { - auto signalConfig = objPtr.getPropertyValue(PROPERTY_NAME_SIGNAL_LIST).asPtrOrNull(); + const auto signalConfig = objPtr.getPropertyValue(PROPERTY_NAME_SIGNAL_LIST).asPtrOrNull(); if (signalConfig.assigned()) { isPresent = true; jsonDataWorker.setConfig(signalConfig.toStdString()); - auto listSubscribedSignals = jsonDataWorker.extractDescription(); - LOG_I("Signal in the list (topic | signal name):"); - for (const auto& [signalId, descriptor] : listSubscribedSignals) + const auto listSubscribedSignals = jsonDataWorker.extractDescription(); + if (!listSubscribedSignals.empty()) { - subscribedSignals.emplace(signalId, descriptor); - signalIdList.push_back(signalId); - LOG_I("\t\"{}\" | \"{}\"", signalId.topic, signalId.signalName); + bool isOneTopic = + std::all_of(listSubscribedSignals.cbegin(), + listSubscribedSignals.cend(), + [&listSubscribedSignals](const auto& s) { return s.first.topic == listSubscribedSignals.front().first.topic; }); + if (!isOneTopic) + { + LOG_E("The JSON config has wrong format (more then one topic found)"); + } + else + { + topicForSubscribing = listSubscribedSignals.front().first.topic; + LOG_I("Signal in the list for the topic \"{}\":", topicForSubscribing); + for (const auto& [signalId, descriptor] : listSubscribedSignals) + { + subscribedSignals.emplace(signalId.signalName, descriptor); + signalNameList.push_back(signalId.signalName); + LOG_I("\t\"{}\"", signalId.signalName); + } + } } } } @@ -80,9 +101,13 @@ void MqttJsonReceiverFbImpl::readProperties() } } +void MqttJsonReceiverFbImpl::propertyChanged() +{ +} + void MqttJsonReceiverFbImpl::createDataPacket(const std::string& topic, const std::string& json) { - auto lock = std::scoped_lock(sync); + auto lock = std::lock_guard(sync); jsonDataWorker.createAndSendDataPacket(topic, json); } @@ -95,29 +120,26 @@ void MqttJsonReceiverFbImpl::processMessage(const mqtt::MqttMessage& msg) void MqttJsonReceiverFbImpl::createSignals() { - auto lock = std::scoped_lock(sync); + auto lock = std::lock_guard(sync); if (!subscribedSignals.empty()) - { LOG_I("Creating signals..."); - } - for (const auto& signalId : signalIdList) + for (const auto& signalName : signalNameList) { - auto iter = subscribedSignals.find(signalId); + auto iter = subscribedSignals.find(signalName); if (iter == subscribedSignals.end()) { - LOG_W("\tSignal \"{}\" on topic \"{}\" is not in the subscribed signal list!", signalId.signalName, signalId.topic); + LOG_W("\tSignal \"{}\" is not in the subscribed signal list!", signalName); continue; } - LOG_D("\tfor the topic \"{}\"", signalId.signalName, signalId.topic); - const std::string& topic = signalId.topic; + LOG_D("\tfor the signal \"{}\"", signalName); auto signalDsc = iter->second; - - auto refS = - outputSignals - .emplace(std::make_pair(signalId, createAndAddSignal(buildSignalNameFromTopic(topic, signalId.signalName), signalDsc))) - .first; + const mqtt::SignalId signalId{topicForSubscribing, signalName}; + auto refS = outputSignals + .emplace(std::make_pair(signalId, + createAndAddSignal(buildSignalNameFromTopic(topicForSubscribing, signalName), signalDsc))) + .first; if (jsonDataWorker.hasDomainSignal(signalId)) { LOG_D("\tThe signal has a domain signal"); @@ -137,7 +159,7 @@ void MqttJsonReceiverFbImpl::createSignals() .setName("Time") .build(); refS->second->setDomainSignal( - createAndAddSignal(buildDomainSignalNameFromTopic(topic, signalId.signalName), domainSignalDsc, false)); + createAndAddSignal(buildDomainSignalNameFromTopic(topicForSubscribing, signalName), domainSignalDsc, false)); } else { @@ -147,22 +169,18 @@ void MqttJsonReceiverFbImpl::createSignals() jsonDataWorker.setOutputSignals(&outputSignals); } -std::vector MqttJsonReceiverFbImpl::getSubscribedTopics() const +std::string MqttJsonReceiverFbImpl::getSubscribedTopic() const { - auto lock = std::scoped_lock(sync); - std::set topicsSet; - for (const auto& [signalId, _] : subscribedSignals) - { - topicsSet.emplace(signalId.topic); - } - return std::vector(topicsSet.cbegin(), topicsSet.cend()); + auto lock = std::lock_guard(sync); + return topicForSubscribing; } -void MqttJsonReceiverFbImpl::clearSubscribedTopics() +void MqttJsonReceiverFbImpl::clearSubscribedTopic() { - auto lock = std::scoped_lock(sync); + auto lock = std::lock_guard(sync); subscribedSignals.clear(); - signalIdList.clear(); + signalNameList.clear(); + topicForSubscribing.clear(); } END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp b/mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp index 6e71fd6d..85f67918 100644 --- a/mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp +++ b/mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp @@ -9,6 +9,15 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE std::atomic MqttPublisherFbImpl::localIndex = 0; +const std::vector> MqttPublisherFbImpl::signalStatusMap = + {{SignalStatus::NotConnected, "NotConnected"}, + {SignalStatus::Invalid, "Invalid"}, + {SignalStatus::Valid, "Valid"}}; + +const std::vector> MqttPublisherFbImpl::publishingStatusMap = + {{PublishingStatus::Ok, "Ok"}, + {PublishingStatus::SampleSkipped, "SampleSkipped"}}; + MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, @@ -19,9 +28,14 @@ MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx, jsonDataWorker(loggerComponent), inputPortCount(0), running(true), - hasError(false) + hasError(false), + skippedMsgCnt(0), + publishingStatusTimer(helper::utils::Timer(1000, false)) { initComponentStatus(); + addTypesToTypeManager(context.getTypeManager()); + setSignalStatus(SignalStatus::NotConnected, "", true); + setPublishingStatus(PublishingStatus::Ok, "", true); if (config.assigned()) initProperties(populateDefaultConfig(type.createDefaultConfig(), config)); else @@ -51,6 +65,14 @@ FunctionBlockTypePtr MqttPublisherFbImpl::CreateType() "default it is set to single-topic mode."); defaultConfig.addProperty(builder.build()); } + { + auto builder = + StringPropertyBuilder(PROPERTY_NAME_PUB_TOPIC_NAME, "") + .setDescription( + "Topic name for publishing in multiple-topic mode. If left empty, the Publisher's Global ID is used as the topic name.") + .setVisible(EvalValue(std::string("$") + PROPERTY_NAME_PUB_TOPIC_MODE + " == 1")); + defaultConfig.addProperty(builder.build()); + } { auto builder = BoolPropertyBuilder(PROPERTY_NAME_PUB_SHARED_TS, False) .setVisible(EvalValue(std::string("$") + PROPERTY_NAME_PUB_TOPIC_MODE + " == 1")) @@ -157,20 +179,35 @@ void MqttPublisherFbImpl::updateInputPorts() void MqttPublisherFbImpl::validateInputPorts() { - const auto status = handler->validateSignalContexts(signalContexts); - hasError = !status.success; - if (!status.success) + skippedMsgCnt = 0; + publishedMsgCnt = 0; + setPublishingStatus(PublishingStatus::Ok); + if (signalContexts.size() == 1) // no one input port is connected { - setComponentStatusWithMessage(ComponentStatus::Error, "Some connected signals were invalidated!"); - for (const auto& msg : status.messages) - { - LOG_E("{}", msg); - } + setComponentStatus(ComponentStatus::Ok); + setSignalStatus(SignalStatus::NotConnected); } else { - setComponentStatus(ComponentStatus::Ok); - handler->signalListChanged(signalContexts); + const auto status = handler->validateSignalContexts(signalContexts); + hasError = !status.success; + if (!status.success) + { + setComponentStatusWithMessage(ComponentStatus::Error, "Some connected signals were invalidated!"); + std::string allMessages; + for (const auto& msg : status.messages) + { + LOG_E("{}", msg); + allMessages += msg + "; "; + } + setSignalStatus(SignalStatus::Invalid, allMessages); + } + else + { + setComponentStatus(ComponentStatus::Ok); + setSignalStatus(SignalStatus::Valid); + handler->signalListChanged(signalContexts); + } } } @@ -184,6 +221,8 @@ void MqttPublisherFbImpl::initProperties(const PropertyObjectPtr& config) if (const auto internalProp = prop.asPtrOrNull(true); internalProp.assigned()) { objPtr.addProperty(internalProp.clone()); + objPtr.getOnPropertyValueWrite(prop.getName()) += + [this](PropertyObjectPtr& obj, PropertyValueEventArgsPtr& args) { propertyChanged(); }; } } objPtr.setPropertyValue(propName, prop.getValue()); @@ -194,7 +233,7 @@ void MqttPublisherFbImpl::initProperties(const PropertyObjectPtr& config) void MqttPublisherFbImpl::readProperties() { auto lock = this->getRecursiveConfigLock(); - int tmpTopicMode = readProperty(PROPERTY_NAME_PUB_TOPIC_MODE, false); + int tmpTopicMode = readProperty(PROPERTY_NAME_PUB_TOPIC_MODE, 0); if (tmpTopicMode < static_cast(TopicMode::_count) && tmpTopicMode >= 0) config.topicMode = static_cast(tmpTopicMode); else @@ -209,6 +248,15 @@ void MqttPublisherFbImpl::readProperties() config.periodMs = readProperty(PROPERTY_NAME_PUB_READ_PERIOD, DEFAULT_PUB_READ_PERIOD); if (config.periodMs < 0) config.periodMs = DEFAULT_PUB_READ_PERIOD; + config.topicName = readProperty(PROPERTY_NAME_PUB_TOPIC_NAME, DEFAULT_PUB_TOPIC_NAME); +} + +void MqttPublisherFbImpl::propertyChanged() +{ + auto lock = this->getRecursiveConfigLock(); + readProperties(); + handler = HandlerFactory::create(this->config, globalId.toStdString()); + validateInputPorts(); } template @@ -244,6 +292,7 @@ void MqttPublisherFbImpl::readerLoop() msgs = handler->processSignalContexts(signalContexts); } sendMessages(msgs); + updatePublishingStatus(); std::this_thread::sleep_for(std::chrono::milliseconds(config.periodMs)); } } @@ -252,10 +301,16 @@ void MqttPublisherFbImpl::sendMessages(const MqttData& data) { for (const auto& [topic, msg] : data) { - auto status = mqttClient->publish(topic, (void*)msg.c_str(), msg.length(), 1); + auto status = mqttClient->publish(topic, (void*)msg.c_str(), msg.length(), config.qos); if (!status.success) { - LOG_W("Failed to publish data to {}; reason - {}", topic, status.msg); + ++skippedMsgCnt; + lastSkippedReason = std::move(status.msg); + LOG_W("Failed to publish data to {}; reason - {}", topic, lastSkippedReason); + } + else + { + ++publishedMsgCnt; } } } @@ -264,4 +319,73 @@ std::string MqttPublisherFbImpl::getLocalId() { return std::string(MQTT_LOCAL_PUB_FB_ID_PREFIX + std::to_string(localIndex++)); } + +void MqttPublisherFbImpl::setSignalStatus(const SignalStatus status, std::string message, bool init) +{ + signalStatus = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, static_cast(status), context.getTypeManager()); + if (init) + statusContainer.template asPtr(true).addStatusWithMessage(MQTT_PUB_FB_SIG_STATUS_NAME, + signalStatus, + message); + else + statusContainer.template asPtr(true).setStatusWithMessage(MQTT_PUB_FB_SIG_STATUS_NAME, + signalStatus, + message); +} + +void MqttPublisherFbImpl::updatePublishingStatus() +{ + if (publishingStatusTimer.expired()) + { + publishingStatusTimer.restart(); + if (skippedMsgCnt != 0) + { + if (statusContainer.getStatus("ComponentStatus") == ComponentStatus::Ok) + setComponentStatusWithMessage(ComponentStatus::Warning, "Some messages were not published!"); + setPublishingStatus(PublishingStatus::SampleSkipped, + fmt::format("Published: {}; Skipped: {}; last reason - {}", + publishedMsgCnt, + skippedMsgCnt, + lastSkippedReason)); + } + else + { + setPublishingStatus(PublishingStatus::Ok, fmt::format("Published: {};", publishedMsgCnt)); + } + } +} + +void MqttPublisherFbImpl::setPublishingStatus(const PublishingStatus status, std::string message, bool init) +{ + publishingStatus = EnumerationWithIntValue(MQTT_PUB_FB_PUB_STATUS_TYPE, static_cast(status), context.getTypeManager()); + if (init) + statusContainer.template asPtr(true).addStatusWithMessage(MQTT_PUB_FB_PUB_STATUS_NAME, + publishingStatus, + message); + else + statusContainer.template asPtr(true).setStatusWithMessage(MQTT_PUB_FB_PUB_STATUS_NAME, + publishingStatus, + message); +} + +void MqttPublisherFbImpl::addTypesToTypeManager(daq::TypeManagerPtr manager) +{ + if (!manager.hasType(MQTT_PUB_FB_SIG_STATUS_TYPE)) + { + auto list = List(); + for (const auto& [_, st] : signalStatusMap) + list.pushBack(st); + + manager.addType(EnumerationType(MQTT_PUB_FB_SIG_STATUS_TYPE, list)); + } + + if (!manager.hasType(MQTT_PUB_FB_PUB_STATUS_TYPE)) + { + auto list = List(); + for (const auto& [_, st] : publishingStatusMap) + list.pushBack(st); + + manager.addType(EnumerationType(MQTT_PUB_FB_PUB_STATUS_TYPE, list)); + } +} END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/src/mqtt_raw_receiver_fb_impl.cpp b/mqtt_streaming_module/src/mqtt_raw_receiver_fb_impl.cpp index eafa825e..341bbf8f 100644 --- a/mqtt_streaming_module/src/mqtt_raw_receiver_fb_impl.cpp +++ b/mqtt_streaming_module/src/mqtt_raw_receiver_fb_impl.cpp @@ -9,13 +9,14 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE constexpr int MQTT_RAW_FB_UNSUBSCRIBE_TOUT = 3000; +std::atomic MqttRawReceiverFbImpl::localIndex = 0; + MqttRawReceiverFbImpl::MqttRawReceiverFbImpl(const ContextPtr& ctx, const ComponentPtr& parent, const FunctionBlockTypePtr& type, - const StringPtr& localId, std::shared_ptr subscriber, const PropertyObjectPtr& config) - : MqttBaseFb(ctx, parent, type, localId, subscriber, config) + : MqttBaseFb(ctx, parent, type, getLocalId(), subscriber, config) { if (config.assigned()) initProperties(populateDefaultConfig(type.createDefaultConfig(), config)); @@ -23,105 +24,115 @@ MqttRawReceiverFbImpl::MqttRawReceiverFbImpl(const ContextPtr& ctx, initProperties(type.createDefaultConfig()); createSignals(); - subscribeToTopics(); + subscribeToTopic(); } MqttRawReceiverFbImpl::~MqttRawReceiverFbImpl() { - unsubscribeFromTopics(); + unsubscribeFromTopic(); } FunctionBlockTypePtr MqttRawReceiverFbImpl::CreateType() { auto defaultConfig = PropertyObject(); - auto builder = ListPropertyBuilder(PROPERTY_NAME_SIGNAL_LIST, List()) - .setDescription("List of MQTT topics to subscribe to for receiving raw binary data."); + auto builder = StringPropertyBuilder(PROPERTY_NAME_TOPIC, "") + .setDescription("An MQTT topic to subscribe to for receiving raw binary data."); defaultConfig.addProperty(builder.build()); const auto fbType = FunctionBlockType(RAW_FB_NAME, RAW_FB_NAME, - "The raw MQTT function block allows subscribing to MQTT topics and converting MQTT payloads into " + "The raw MQTT function block allows subscribing to an MQTT topic and converting MQTT payloads into " "openDAQ signal binary data samples.", defaultConfig); return fbType; } +std::string MqttRawReceiverFbImpl::getLocalId() +{ + return std::string(MQTT_LOCAL_RAW_FB_ID_PREFIX + std::to_string(localIndex++)); +} + void MqttRawReceiverFbImpl::readProperties() { - auto lock = std::scoped_lock(sync); - topicsForSubscribing.clear(); + auto lock = std::lock_guard(sync); + topicForSubscribing.clear(); bool isPresent = false; - if (objPtr.hasProperty(PROPERTY_NAME_SIGNAL_LIST)) + if (objPtr.hasProperty(PROPERTY_NAME_TOPIC)) { - auto prop = objPtr.getPropertyValue(PROPERTY_NAME_SIGNAL_LIST).asPtrOrNull(); - if (prop.assigned()) + auto topicStr = objPtr.getPropertyValue(PROPERTY_NAME_TOPIC).asPtrOrNull(); + if (topicStr.assigned()) { isPresent = true; - if (prop.getCount() != 0) + const auto validationStatus = mqtt::MqttDataWrapper::validateTopic(topicStr, loggerComponent); + if (validationStatus.success) { - LOG_I("Topics in the list:"); + LOG_I("An MQTT topic: {}", topicStr.toStdString()); + topicForSubscribing = topicStr.toStdString(); + setComponentStatus(ComponentStatus::Ok); + setSubscriptionStatus(SubscriptionStatus::WaitingForData, "Subscribing to topic: " + topicForSubscribing); } - for (const auto& topic : prop) + else { - auto topicStr = topic.asPtr(); - if (mqtt::MqttDataWrapper::validateTopic(topicStr, loggerComponent)) - { - LOG_I("\t{}", topicStr.toStdString()); - topicsForSubscribing.emplace_back(topicStr.toStdString()); - } + setComponentStatus(ComponentStatus::Warning); + setSubscriptionStatus(SubscriptionStatus::InvalidTopicName, validationStatus.msg); } } } if (!isPresent) { - LOG_W("{} property is missing!", PROPERTY_NAME_SIGNAL_LIST); + LOG_W("\'{}\' property is missing!", PROPERTY_NAME_TOPIC); + setComponentStatus(ComponentStatus::Warning); + setSubscriptionStatus(SubscriptionStatus::InvalidTopicName, "The topic property is not set!"); } - if (topicsForSubscribing.empty()) + if (topicForSubscribing.empty()) { - LOG_W("No topics to subscribe to!"); + LOG_W("No topic to subscribe to!"); } } -void MqttRawReceiverFbImpl::processMessage(const mqtt::MqttMessage& msg) +void MqttRawReceiverFbImpl::propertyChanged() { - std::string topic(msg.getTopic()); - - auto lock = std::scoped_lock(sync); - auto signalIter = outputSignals.find(topic); - if (signalIter == outputSignals.end()) + auto result = unsubscribeFromTopic(); + if (result.success == false) { + LOG_W("Failed to unsubscribe from the previous topic before subscribing to a new one; reason: {}", result.msg); return; } - - const auto& signal = signalIter->second; - const auto outputPacket = BinaryDataPacket(nullptr, signal.getDescriptor(), msg.getData().size()); - memcpy(outputPacket.getData(), msg.getData().data(), msg.getData().size()); - signal.sendPacket(outputPacket); + readProperties(); + result = subscribeToTopic(); } -void MqttRawReceiverFbImpl::createSignals() +void MqttRawReceiverFbImpl::processMessage(const mqtt::MqttMessage& msg) { - auto lock = std::scoped_lock(sync); - if (!topicsForSubscribing.empty()) + const std::string topic(msg.getTopic()); + + auto lock = std::lock_guard(sync); + if (topicForSubscribing == topic) { - LOG_I("Creating signals..."); + if (subscriptionStatus.getIntValue() == static_cast(SubscriptionStatus::WaitingForData)) + { + setSubscriptionStatus(SubscriptionStatus::HasData); + } + const auto outputPacket = BinaryDataPacket(nullptr, outputSignal.getDescriptor(), msg.getData().size()); + memcpy(outputPacket.getData(), msg.getData().data(), msg.getData().size()); + outputSignal.sendPacket(outputPacket); } - for (const auto& topic : topicsForSubscribing) - { - LOG_D("\tfor the topic: {}", topic); +} - const auto signalDsc = DataDescriptorBuilder().setSampleType(SampleType::Binary).build(); - outputSignals.emplace(std::make_pair(topic, createAndAddSignal(buildSignalNameFromTopic(topic, ""), signalDsc))); - } +void MqttRawReceiverFbImpl::createSignals() +{ + auto lock = std::lock_guard(sync); + const auto signalDsc = DataDescriptorBuilder().setSampleType(SampleType::Binary).build(); + outputSignal = createAndAddSignal("mqttValueSignal", signalDsc); } -std::vector MqttRawReceiverFbImpl::getSubscribedTopics() const +std::string MqttRawReceiverFbImpl::getSubscribedTopic() const { - return topicsForSubscribing; + return topicForSubscribing; } -void MqttRawReceiverFbImpl::clearSubscribedTopics() +void MqttRawReceiverFbImpl::clearSubscribedTopic() { - topicsForSubscribing.clear(); + topicForSubscribing.clear(); } END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE diff --git a/mqtt_streaming_module/src/mqtt_root_fb_impl.cpp b/mqtt_streaming_module/src/mqtt_root_fb_impl.cpp index def4b29e..4ea379da 100644 --- a/mqtt_streaming_module/src/mqtt_root_fb_impl.cpp +++ b/mqtt_streaming_module/src/mqtt_root_fb_impl.cpp @@ -217,11 +217,11 @@ FunctionBlockPtr MqttRootFbImpl::onAddFunctionBlock(const StringPtr& typeId, con auto fbTypePtr = baseFbTypes.getOrDefault(typeId); if (fbTypePtr.getName() == RAW_FB_NAME) { - nestedFunctionBlock = createWithImplementation(context, functionBlocks, fbTypePtr, typeId, subscriber, config); + nestedFunctionBlock = createWithImplementation(context, functionBlocks, fbTypePtr, subscriber, config); } else if (fbTypePtr.getName() == JSON_FB_NAME) { - nestedFunctionBlock = createWithImplementation(context, functionBlocks, fbTypePtr, typeId, subscriber, config); + nestedFunctionBlock = createWithImplementation(context, functionBlocks, fbTypePtr, subscriber, config); } else if (fbTypePtr.getName() == PUB_FB_NAME) { diff --git a/mqtt_streaming_module/tests/CMakeLists.txt b/mqtt_streaming_module/tests/CMakeLists.txt index 6b5c6f5a..2e336cf6 100644 --- a/mqtt_streaming_module/tests/CMakeLists.txt +++ b/mqtt_streaming_module/tests/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(${TEST_APP} PRIVATE daq::test_utils ${MODULE_NAME} mqtt_stream_module mqtt_streaming_protocol_test_helper + mqtt_streaming_helper ) add_test(NAME ${TEST_APP} diff --git a/mqtt_streaming_module/tests/test_data.h b/mqtt_streaming_module/tests/test_data.h index 3f182edc..41abdb18 100644 --- a/mqtt_streaming_module/tests/test_data.h +++ b/mqtt_streaming_module/tests/test_data.h @@ -3,7 +3,7 @@ #include #include -inline const std::string VALID_JSON_0 = R"json({ +inline const std::string VALID_JSON_4_TOPIC_0 = R"json({ "openDAQ/RefDev0/IO/AI/RefCh0/Sig/AI0": [ { "AI0": { @@ -49,7 +49,7 @@ inline const std::string VALID_JSON_0 = R"json({ } )json"; -inline const std::string VALID_JSON_1 = R"json({ +inline const std::string VALID_JSON_1_TOPIC_1 = R"json({ "/mirip/UNet3AC2/sensor/data":[ { "temp":{ @@ -82,7 +82,7 @@ inline const std::string VALID_JSON_1 = R"json({ } )json"; -inline const std::string VALID_JSON_2 = R"json({ +inline const std::string VALID_JSON_3_TOPIC_2 = R"json({ "/mirip/UNet3AC2/sensor/data0":[ { "temp":{ @@ -146,6 +146,21 @@ inline const std::string VALID_JSON_2 = R"json({ } )json"; +inline const std::string VALID_JSON_1_TOPIC_3 = R"json({ + "/mirip/UNet3AC2/sensor/data":[ + { + "temp":{ + "Value":"temp", + "Timestamp":"ts", + "Unit":[ + "°C" + ] + } + } + ] +} +)json"; + inline const std::string WILDCARD_JSON_0 = R"json({ "/mirip/UNet3AC2/+/data0":[ { diff --git a/mqtt_streaming_module/tests/test_mqtt_json_fb.cpp b/mqtt_streaming_module/tests/test_mqtt_json_fb.cpp index 2b99b23d..4316c062 100644 --- a/mqtt_streaming_module/tests/test_mqtt_json_fb.cpp +++ b/mqtt_streaming_module/tests/test_mqtt_json_fb.cpp @@ -1,5 +1,5 @@ #include "MqttAsyncClientWrapper.h" -#include "Timer.h" +#include "mqtt_streaming_helper/timer.h" #include "mqtt_streaming_module/mqtt_json_receiver_fb_impl.h" #include "test_daq_test_helper.h" #include "test_data.h" @@ -51,7 +51,7 @@ class MqttJsonFbHelper config.addProperty(StringProperty(PROPERTY_NAME_SIGNAL_LIST, String(""))); const auto fbType = FunctionBlockType(JSON_FB_NAME, JSON_FB_NAME, "", config); config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, jsonConfig); - obj = std::make_unique(NullContext(), nullptr, fbType, "localId", nullptr, config); + obj = std::make_unique(NullContext(), nullptr, fbType, nullptr, config); } auto getSignals() @@ -236,7 +236,7 @@ class MqttJsonFbHelper { std::vector result; - auto timer = Timer(timeoutMs); + auto timer = helper::utils::Timer(timeoutMs); while (!reader.getEmpty() || !timer.expired()) { if (reader.getEmpty()) @@ -417,7 +417,7 @@ TEST_F(MqttJsonFbTest, Config) StartUp(); auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(JSON_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, VALID_JSON_0); + config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, VALID_JSON_1_TOPIC_1); daq::FunctionBlockPtr jsonFb; ASSERT_NO_THROW(jsonFb = rootMqttFb.addFunctionBlock(JSON_FB_NAME, config)); @@ -439,23 +439,6 @@ TEST_F(MqttJsonFbTest, Creation) ASSERT_NO_THROW(jsonFb = rootMqttFb.addFunctionBlock(JSON_FB_NAME)); ASSERT_EQ(jsonFb.getStatusContainer().getStatus("ComponentStatus"), Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); - ASSERT_EQ(jsonFb.getName(), JSON_FB_NAME); - auto fbs = rootMqttFb.getFunctionBlocks(); - bool contain = false; - daq::GenericFunctionBlockPtr fbFromList; - for (const auto& fb : fbs) - { - contain = (fb.getName() == JSON_FB_NAME); - if (contain) - { - fbFromList = fb; - break; - } - } - ASSERT_TRUE(contain); - ASSERT_TRUE(fbFromList.assigned()); - ASSERT_EQ(fbFromList.getName(), jsonFb.getName()); - ASSERT_TRUE(fbFromList == jsonFb); } TEST_F(MqttJsonFbTest, CreationWithDefaultConfig) @@ -475,7 +458,7 @@ TEST_F(MqttJsonFbTest, CreationWithPartialConfig) StartUp(); daq::FunctionBlockPtr jsonFb; auto config = PropertyObject(); - config.addProperty(StringProperty(PROPERTY_NAME_SIGNAL_LIST, String(VALID_JSON_0))); + config.addProperty(StringProperty(PROPERTY_NAME_SIGNAL_LIST, String(VALID_JSON_1_TOPIC_1))); ASSERT_NO_THROW(jsonFb = rootMqttFb.addFunctionBlock(JSON_FB_NAME, config)); ASSERT_EQ(jsonFb.getStatusContainer().getStatus("ComponentStatus"), Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); @@ -487,7 +470,7 @@ TEST_F(MqttJsonFbTest, CreationWithCustomConfig) StartUp(); daq::FunctionBlockPtr jsonFb; auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(JSON_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, String(VALID_JSON_0)); + config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, String(VALID_JSON_1_TOPIC_1)); ASSERT_NO_THROW(jsonFb = rootMqttFb.addFunctionBlock(JSON_FB_NAME, config)); ASSERT_EQ(jsonFb.getStatusContainer().getStatus("ComponentStatus"), Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); @@ -513,9 +496,10 @@ TEST_P(MqttJsonFbRightJsonConfigPTest, CheckNumberOfSignal) INSTANTIATE_TEST_SUITE_P(SignalNumbersTest, MqttJsonFbRightJsonConfigPTest, - ::testing::Values(std::make_pair(VALID_JSON_0, 4), - std::make_pair(VALID_JSON_1, 3), - std::make_pair(VALID_JSON_2, 6))); + ::testing::Values(std::make_pair(VALID_JSON_4_TOPIC_0, 0), + std::make_pair(VALID_JSON_1_TOPIC_1, 3), + std::make_pair(VALID_JSON_3_TOPIC_2, 0), + std::make_pair(VALID_JSON_1_TOPIC_3, 1))); TEST_F(MqttJsonFbTest, SignalListWithWildcard) { diff --git a/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp b/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp index 17c41513..255724e4 100644 --- a/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp +++ b/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp @@ -1,5 +1,5 @@ #include "MqttAsyncClientWrapper.h" -#include "Timer.h" +#include "mqtt_streaming_helper/timer.h" #include "mqtt_streaming_module/mqtt_publisher_fb_impl.h" #include "test_daq_test_helper.h" #include @@ -209,8 +209,14 @@ class MqttPublisherFbHelper : public DaqTestHelper fb = rootMqttFb.addFunctionBlock(PUB_FB_NAME, config); } - void CreatePublisherFB( - bool multiTopic, bool sharedTs, bool groupV, bool useSignalNames, uint valuePackSize = 0, uint qos = 2, uint readPeriod = 20) + void CreatePublisherFB(bool multiTopic, + bool sharedTs, + bool groupV, + bool useSignalNames, + const std::string& topicName, + uint valuePackSize = 0, + uint qos = 2, + uint readPeriod = 20) { auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(PUB_FB_NAME).createDefaultConfig(); config.setPropertyValue(PROPERTY_NAME_PUB_TOPIC_MODE, multiTopic ? 1 : 0); @@ -220,6 +226,7 @@ class MqttPublisherFbHelper : public DaqTestHelper config.setPropertyValue(PROPERTY_NAME_PUB_GROUP_VALUES_PACK_SIZE, valuePackSize); config.setPropertyValue(PROPERTY_NAME_PUB_QOS, qos); config.setPropertyValue(PROPERTY_NAME_PUB_READ_PERIOD, readPeriod); + config.setPropertyValue(PROPERTY_NAME_PUB_TOPIC_NAME, topicName); fb = rootMqttFb.addFunctionBlock(PUB_FB_NAME, config); } @@ -405,7 +412,7 @@ class MqttPublisherFbHelper : public DaqTestHelper std::atomic done{false}; subscriber->expectMsgs(topic, messages, receivedPromise, done); - Timer receiveTimer(5000); + helper::utils::Timer receiveTimer(5000); bool ok = subscriber->subscribe(topic, 2); if (!ok) return false; @@ -416,6 +423,11 @@ class MqttPublisherFbHelper : public DaqTestHelper return ok; } + std::string buildTopicName(const std::string& postfix = "") + { + return std::string("test/topic/publisherFB/") + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()) + postfix; + } + private: static std::string doubleToString(double value, int precision = 12) { @@ -495,7 +507,7 @@ TEST_F(MqttPublisherFbTest, DefaultConfig) ASSERT_TRUE(defaultConfig.assigned()); - ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 7u); + ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 8u); ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_PUB_TOPIC_MODE)); ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_PUB_TOPIC_MODE).getValueType(), CoreType::ctInt); @@ -526,6 +538,11 @@ TEST_F(MqttPublisherFbTest, DefaultConfig) ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_PUB_READ_PERIOD)); ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_PUB_READ_PERIOD).getValueType(), CoreType::ctInt); ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_PUB_READ_PERIOD).asPtr(), DEFAULT_PUB_READ_PERIOD); + + ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_PUB_TOPIC_NAME)); + ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_PUB_TOPIC_NAME).getValueType(), CoreType::ctString); + ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_PUB_TOPIC_NAME).asPtr().toStdString(), std::string(DEFAULT_PUB_TOPIC_NAME)); + ASSERT_FALSE(defaultConfig.getProperty(PROPERTY_NAME_PUB_TOPIC_NAME).getVisible()); } TEST_F(MqttPublisherFbTest, PropertyVisibility) @@ -551,6 +568,11 @@ TEST_F(MqttPublisherFbTest, PropertyVisibility) ASSERT_TRUE(defaultConfig.getProperty(PROPERTY_NAME_PUB_GROUP_VALUES).getVisible()); defaultConfig.setPropertyValue(PROPERTY_NAME_PUB_TOPIC_MODE, 1); // Set to Multi topic ASSERT_FALSE(defaultConfig.getProperty(PROPERTY_NAME_PUB_GROUP_VALUES).getVisible()); + + defaultConfig.setPropertyValue(PROPERTY_NAME_PUB_TOPIC_MODE, 0); // Set to Single topic + ASSERT_FALSE(defaultConfig.getProperty(PROPERTY_NAME_PUB_TOPIC_NAME).getVisible()); + defaultConfig.setPropertyValue(PROPERTY_NAME_PUB_TOPIC_MODE, 1); // Set to Multi topic + ASSERT_TRUE(defaultConfig.getProperty(PROPERTY_NAME_PUB_TOPIC_NAME).getVisible()); } TEST_F(MqttPublisherFbTest, Config) @@ -565,6 +587,7 @@ TEST_F(MqttPublisherFbTest, Config) config.setPropertyValue(PROPERTY_NAME_PUB_GROUP_VALUES_PACK_SIZE, 3); config.setPropertyValue(PROPERTY_NAME_PUB_QOS, 2); config.setPropertyValue(PROPERTY_NAME_PUB_READ_PERIOD, 100); + config.setPropertyValue(PROPERTY_NAME_PUB_TOPIC_NAME, buildTopicName()); daq::FunctionBlockPtr fb; ASSERT_NO_THROW(fb = rootMqttFb.addFunctionBlock(PUB_FB_NAME, config)); ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), @@ -642,36 +665,53 @@ TEST_F(MqttPublisherFbTest, CreationWithPartialConfig) TEST_F(MqttPublisherFbTest, ConnectToPort) { StartUp(); + MqttPublisherFbImpl::addTypesToTypeManager(rootMqttFb.getContext().getTypeManager()); + const auto sigStValid = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, + static_cast(MqttPublisherFbImpl::SignalStatus::Valid), + daqInstance.getContext().getTypeManager()); + const auto sigStInvalid = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, + static_cast(MqttPublisherFbImpl::SignalStatus::Invalid), + daqInstance.getContext().getTypeManager()); + const auto sigStNotConnected = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, + static_cast(MqttPublisherFbImpl::SignalStatus::NotConnected), + daqInstance.getContext().getTypeManager()); + const auto comStOk = Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()); + const auto comStError = Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()); { daq::FunctionBlockPtr fb; ASSERT_NO_THROW(fb = rootMqttFb.addFunctionBlock(PUB_FB_NAME)); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStNotConnected); auto help = SignalHelper(); ASSERT_EQ(fb.getInputPorts().getCount(), 1u); fb.getInputPorts()[0].connect(help.signal0); ASSERT_EQ(fb.getInputPorts().getCount(), 2u); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); + fb.getInputPorts()[1].connect(help.signal0); ASSERT_EQ(fb.getInputPorts().getCount(), 3u); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); // disconnection fb.getInputPorts()[1].disconnect(); ASSERT_EQ(fb.getInputPorts().getCount(), 2u); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); // connection without a domain signal fb.getInputPorts()[1].connect(help.signalWithoutDomain); ASSERT_EQ(fb.getInputPorts().getCount(), 3u); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); // disconnection fb.getInputPorts()[1].disconnect(); ASSERT_EQ(fb.getInputPorts().getCount(), 2u); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); + fb.getInputPorts()[0].disconnect(); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStNotConnected); } { @@ -686,8 +726,8 @@ TEST_F(MqttPublisherFbTest, ConnectToPort) fb.getInputPorts()[0].connect(signal0); fb.getInputPorts()[1].connect(signal1); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); } { @@ -702,11 +742,11 @@ TEST_F(MqttPublisherFbTest, ConnectToPort) fb.getInputPorts()[0].connect(signal0); fb.getInputPorts()[1].connect(signal1); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStError); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStInvalid); fb.getInputPorts()[1].disconnect(); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); } { @@ -721,11 +761,11 @@ TEST_F(MqttPublisherFbTest, ConnectToPort) fb.getInputPorts()[0].connect(signal0); fb.getInputPorts()[1].connect(signal1); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStError); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStInvalid); fb.getInputPorts()[1].disconnect(); - ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), - Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk); + ASSERT_EQ(fb.getStatusContainer().getStatus(MQTT_PUB_FB_SIG_STATUS_NAME), sigStValid); } } @@ -762,7 +802,7 @@ TEST_P(MqttPublisherFbPTest, TransferSingleGroupValues) { StartUp(); - ASSERT_NO_THROW(CreatePublisherFB(false, false, true, false, packSize)); + ASSERT_NO_THROW(CreatePublisherFB(false, false, true, false, buildTopicName(), packSize)); fb.getInputPorts()[0].connect(help.signal0); ASSERT_TRUE(CreateSubscriber()); @@ -785,8 +825,8 @@ TEST_P(MqttPublisherFbPTest, TransferSharedTs) [&](auto& help) { StartUp(); - - ASSERT_NO_THROW(CreatePublisherFB(true, true, false, false)); + const std::string topic = buildTopicName(); + ASSERT_NO_THROW(CreatePublisherFB(true, true, false, false, topic)); fb.getInputPorts()[0].connect(help.signal0); fb.getInputPorts()[1].connect(help.signal1); @@ -795,7 +835,6 @@ TEST_P(MqttPublisherFbPTest, TransferSharedTs) const auto data = help.generateTestData(sampleCnt); const std::vector messages = expectedMsgsForSharedTs(help.signal0.getGlobalId().toStdString(), help.signal1.getGlobalId().toStdString(), data); - const std::string topic = fb.getGlobalId().toStdString(); auto ok = transfer(topic, messages, help, data); ASSERT_TRUE(ok); }, @@ -810,8 +849,8 @@ TEST_P(MqttPublisherFbPTest, TransferMultimessage) [&](auto& help) { StartUp(); - - ASSERT_NO_THROW(CreatePublisherFB(true, false, false, false)); + const std::string topic = buildTopicName(); + ASSERT_NO_THROW(CreatePublisherFB(true, false, false, false, topic)); fb.getInputPorts()[0].connect(help.signal0); fb.getInputPorts()[1].connect(help.signal1); @@ -820,7 +859,6 @@ TEST_P(MqttPublisherFbPTest, TransferMultimessage) const auto data = help.generateTestData(sampleCnt); const std::vector messages = expectedMsgsForMultimsg(help.signal0.getGlobalId().toStdString(), help.signal1.getGlobalId().toStdString(), data); - const std::string topic = fb.getGlobalId().toStdString(); auto ok = transfer(topic, messages, help, data); ASSERT_TRUE(ok); }, diff --git a/mqtt_streaming_module/tests/test_mqtt_raw_fb.cpp b/mqtt_streaming_module/tests/test_mqtt_raw_fb.cpp index 5a748e17..2f3a9a28 100644 --- a/mqtt_streaming_module/tests/test_mqtt_raw_fb.cpp +++ b/mqtt_streaming_module/tests/test_mqtt_raw_fb.cpp @@ -7,6 +7,7 @@ #include #include #include +#include using namespace daq; using namespace daq::modules::mqtt_streaming_module; @@ -24,25 +25,25 @@ class MqttRawFbTest : public testing::Test, public DaqTestHelper obj->onSignalsMessage(unused, msg); } - void CreateRawFB(std::vector topics) + void CreateRawFB(std::string topic) { auto config = PropertyObject(); - config.addProperty(ListProperty(PROPERTY_NAME_SIGNAL_LIST, List())); + config.addProperty(StringProperty(PROPERTY_NAME_TOPIC, "")); const auto fbType = FunctionBlockType(RAW_FB_NAME, RAW_FB_NAME, "", config); - auto topicList = List(); - for (auto& topic : topics) - { - addToList(topicList, std::move(topic)); - } - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, topicList); - obj = std::make_unique(NullContext(), nullptr, fbType, "localId", nullptr, config); + config.setPropertyValue(PROPERTY_NAME_TOPIC, topic); + obj = std::make_unique(NullContext(), nullptr, fbType, nullptr, config); } - std::string buildTopicName() + std::string buildTopicName(const std::string& postfix = "") { - return std::string("test/topic/") + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()); + return std::string("test/topic/") + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()) + postfix; } }; + +class MqttRawFbPTest : public ::testing::TestWithParam>, + public DaqTestHelper +{ +}; } // namespace daq::modules::mqtt_streaming_module TEST_F(MqttRawFbTest, DefaultRawFbConfig) @@ -59,36 +60,21 @@ TEST_F(MqttRawFbTest, DefaultRawFbConfig) ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 1u); - ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_SIGNAL_LIST)); + ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_TOPIC)); - ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_SIGNAL_LIST).getValueType(), CoreType::ctList); - ASSERT_TRUE(defaultConfig.getPropertyValue(PROPERTY_NAME_SIGNAL_LIST).asPtr().empty()); + ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_TOPIC).getValueType(), CoreType::ctString); + ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_TOPIC).asPtr().getLength(), 0u); } -TEST_F(MqttRawFbTest, CreateRawFunctionalBlocks) +TEST_F(MqttRawFbTest, Creation) { StartUp(); daq::FunctionBlockPtr rawFb; - ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME)); + auto config = PropertyObject(); + config.addProperty(StringProperty(PROPERTY_NAME_TOPIC, buildTopicName())); + ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); ASSERT_EQ(rawFb.getStatusContainer().getStatus("ComponentStatus"), Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); - ASSERT_EQ(rawFb.getName(), RAW_FB_NAME); - auto fbs = rootMqttFb.getFunctionBlocks(); - bool contain = false; - daq::GenericFunctionBlockPtr fbFromList; - for (const auto& fb : fbs) - { - contain = (fb.getName() == RAW_FB_NAME); - if (contain) - { - fbFromList = fb; - break; - } - } - ASSERT_TRUE(contain); - ASSERT_TRUE(fbFromList.assigned()); - ASSERT_EQ(fbFromList.getName(), rawFb.getName()); - ASSERT_TRUE(fbFromList == rawFb); } TEST_F(MqttRawFbTest, CheckRawFbWithEmptyConfig) @@ -98,7 +84,30 @@ TEST_F(MqttRawFbTest, CheckRawFbWithEmptyConfig) auto config = PropertyObject(); ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); auto signals = rawFb.getSignals(); - ASSERT_EQ(signals.getCount(), 0u); + ASSERT_EQ(signals.getCount(), 1u); +} + +TEST_F(MqttRawFbTest, TwoFbCreation) +{ + StartUp(); + { + daq::FunctionBlockPtr fb; + auto config = PropertyObject(); + config.addProperty(StringProperty(PROPERTY_NAME_TOPIC, buildTopicName("0"))); + ASSERT_NO_THROW(fb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); + EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + } + { + daq::FunctionBlockPtr fb; + auto config = PropertyObject(); + config.addProperty(StringProperty(PROPERTY_NAME_TOPIC, buildTopicName("1"))); + ASSERT_NO_THROW(fb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); + EXPECT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + } + auto fbs = rootMqttFb.getFunctionBlocks(); + ASSERT_EQ(fbs.getCount(), 2u); } TEST_F(MqttRawFbTest, CheckRawFbWithDefaultConfig) @@ -107,7 +116,7 @@ TEST_F(MqttRawFbTest, CheckRawFbWithDefaultConfig) daq::FunctionBlockPtr rawFb; ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME)); auto signals = rawFb.getSignals(); - ASSERT_EQ(signals.getCount(), 0u); + ASSERT_EQ(signals.getCount(), 1u); } TEST_F(MqttRawFbTest, CheckRawFbWithPartialConfig) @@ -116,7 +125,7 @@ TEST_F(MqttRawFbTest, CheckRawFbWithPartialConfig) StartUp(); daq::FunctionBlockPtr rawFb; auto config = PropertyObject(); - config.addProperty(ListProperty(PROPERTY_NAME_SIGNAL_LIST, List())); + config.addProperty(StringProperty(PROPERTY_NAME_TOPIC, "")); ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); } @@ -126,79 +135,90 @@ TEST_F(MqttRawFbTest, CheckRawFbWithCustomConfig) StartUp(); daq::FunctionBlockPtr rawFb; auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(RAW_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, List(buildTopicName())); + config.setPropertyValue(PROPERTY_NAME_TOPIC, buildTopicName()); ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); } -TEST_F(MqttRawFbTest, CheckRawFbSignalList) +TEST_F(MqttRawFbTest, CheckRawFbConfig) { - constexpr uint NUM_TOPICS = 5u; StartUp(); const auto topic = buildTopicName(); - auto topicList = List(); - for (int i = 0; i < NUM_TOPICS; ++i) - { - addToList(topicList, fmt::format("{}_{}", topic, i)); - } auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(RAW_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, topicList); + config.setPropertyValue(PROPERTY_NAME_TOPIC, topic); daq::FunctionBlockPtr rawFb; ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); - auto signals = rawFb.getSignals(); - ASSERT_EQ(signals.getCount(), NUM_TOPICS); + + const auto allProperties = rawFb.getAllProperties(); + ASSERT_EQ(allProperties.getCount(), config.getAllProperties().getCount()); + + for (const auto& pror : config.getAllProperties()) + { + const auto propName = pror.getName(); + ASSERT_TRUE(rawFb.hasProperty(propName)); + ASSERT_EQ(rawFb.getPropertyValue(propName), config.getPropertyValue(propName)); + } } -TEST_F(MqttRawFbTest, CheckRawFbSignalListWithWildcard) +TEST_F(MqttRawFbTest, CheckRawFbSubscriptionStatusWaitingForData) { StartUp(); - auto topicList = List(); - addToList(topicList, ""); - addToList(topicList, "goodTopic/test/topic"); - addToList(topicList, "badTopic/+/test/topic"); - addToList(topicList, "badTopic/+/+/topic"); - addToList(topicList, "badTopic/#"); - addToList(topicList, "goodTopic/test/newTopic"); - auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(RAW_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, topicList); + config.setPropertyValue(PROPERTY_NAME_TOPIC, buildTopicName()); daq::FunctionBlockPtr rawFb; ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); - auto signals = rawFb.getSignals(); - ASSERT_EQ(signals.getCount(), 2u); + EXPECT_EQ(rawFb.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + + EXPECT_EQ(rawFb.getStatusContainer().getStatus("SubscriptionStatus"), + EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, + static_cast(MqttBaseFb::SubscriptionStatus::WaitingForData), + daqInstance.getContext().getTypeManager())); } -TEST_F(MqttRawFbTest, CheckRawFbConfig) +TEST_P(MqttRawFbPTest, CheckRawFbTopic) { - constexpr uint NUM_TOPICS = 5u; + auto [topic, result] = GetParam(); StartUp(); - const auto topic = buildTopicName(); - auto topicList = List(); - for (int i = 0; i < NUM_TOPICS; ++i) - { - addToList(topicList, fmt::format("{}_{}", topic, i)); - } auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(RAW_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, topicList); + config.setPropertyValue(PROPERTY_NAME_TOPIC, topic); daq::FunctionBlockPtr rawFb; ASSERT_NO_THROW(rawFb = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config)); - - const auto allProperties = rawFb.getAllProperties(); - ASSERT_EQ(allProperties.getCount(), config.getAllProperties().getCount()); - - for (const auto& pror : config.getAllProperties()) + auto signals = rawFb.getSignals(); + ASSERT_EQ(signals.getCount(), 1); + const auto expectedComponentStatus = result ? "Ok" : "Warning"; + EXPECT_EQ(rawFb.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", expectedComponentStatus, daqInstance.getContext().getTypeManager())); + if (result) { - const auto propName = pror.getName(); - ASSERT_TRUE(rawFb.hasProperty(propName)); - ASSERT_EQ(rawFb.getPropertyValue(propName), config.getPropertyValue(propName)); + EXPECT_NE(rawFb.getStatusContainer().getStatus("SubscriptionStatus"), + EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, + static_cast(MqttBaseFb::SubscriptionStatus::InvalidTopicName), + daqInstance.getContext().getTypeManager())); + } + else + { + EXPECT_EQ(rawFb.getStatusContainer().getStatus("SubscriptionStatus"), + EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, + static_cast(MqttBaseFb::SubscriptionStatus::InvalidTopicName), + daqInstance.getContext().getTypeManager())); } } +INSTANTIATE_TEST_SUITE_P(TopicTest, + MqttRawFbPTest, + ::testing::Values(std::make_pair("", false), + std::make_pair("goodTopic/test", true), + std::make_pair("/goodTopic/test0", true), + std::make_pair("badTopic/+/test/topic", false), + std::make_pair("badTopic/+/+/topic", false), + std::make_pair("badTopic/#", false))); + TEST_F(MqttRawFbTest, CheckRawFbDataTransfer) { const auto topic = buildTopicName(); @@ -245,11 +265,9 @@ TEST_F(MqttRawFbTest, CheckRawFbFullDataTransfer) StartUp(); - auto topicList = List(); - addToList(topicList, topic); auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(RAW_FB_NAME).createDefaultConfig(); - config.setPropertyValue(PROPERTY_NAME_SIGNAL_LIST, topicList); + config.setPropertyValue(PROPERTY_NAME_TOPIC, topic); auto singal = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config).getSignals()[0]; auto reader = daq::PacketReader(singal); @@ -287,3 +305,85 @@ TEST_F(MqttRawFbTest, CheckRawFbFullDataTransfer) ASSERT_EQ(dataToSend.size(), dataToReceive.size()); ASSERT_EQ(dataToSend, dataToReceive); } + +TEST_F(MqttRawFbTest, CheckRawFbFullDataTransferWithReconfiguring) +{ + const std::string topic0 = buildTopicName("0"); + const std::string topic1 = buildTopicName("1"); + const auto dataToSend = std::vector>{std::vector{0x01, 0x02, 0x03, 0x04, 0x05}, + std::vector{0x11, 0x12, 0x13, 0x14}}; + std::vector> dataToReceive; + + StartUp(); + + auto config = rootMqttFb.getAvailableFunctionBlockTypes().get(RAW_FB_NAME).createDefaultConfig(); + config.setPropertyValue(PROPERTY_NAME_TOPIC, topic0); + auto rawFB = rootMqttFb.addFunctionBlock(RAW_FB_NAME, config); + auto singal = rawFB.getSignals()[0]; + auto reader = daq::PacketReader(singal); + + const auto stHasData = EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, + static_cast(MqttBaseFb::SubscriptionStatus::HasData), + daqInstance.getContext().getTypeManager()); + + const auto stWaitData = EnumerationWithIntValue(MQTT_RAW_FB_SUB_STATUS_TYPE, + static_cast(MqttBaseFb::SubscriptionStatus::WaitingForData), + daqInstance.getContext().getTypeManager()); + + MqttAsyncClientWrapper publisher("testPublisherId"); + ASSERT_TRUE(publisher.connect("127.0.0.1")); + EXPECT_EQ(rawFB.getStatusContainer().getStatus("SubscriptionStatus"), stWaitData); + + mqtt::MqttMessage msg = {topic0, dataToSend[0], 2, 0}; + ASSERT_TRUE(publisher.publishMsg(msg)); + + auto readerLambda = [&reader, &dataToReceive]() + { + while (!reader.getEmpty()) + { + auto packet = reader.read(); + if (const auto eventPacket = packet.asPtrOrNull(); eventPacket.assigned()) + { + continue; + } + if (const auto dataPacket = packet.asPtrOrNull(); dataPacket.assigned()) + { + std::vector readData(dataPacket.getDataSize()); + memcpy(readData.data(), dataPacket.getData(), dataPacket.getDataSize()); + dataToReceive.push_back(std::move(readData)); + } + } + }; + helper::utils::Timer tmr(1000, true); + + bool hasData = false; + while (tmr.expired() == false && hasData == false) + hasData = rawFB.getStatusContainer().getStatus("SubscriptionStatus") == stHasData; + + EXPECT_TRUE(hasData); + + readerLambda(); + ASSERT_EQ(dataToReceive.size(), 1u); + ASSERT_EQ(dataToSend[0], dataToReceive[0]); + + dataToReceive.clear(); + + ASSERT_NO_THROW(rawFB.setPropertyValue(PROPERTY_NAME_TOPIC, topic1)); + EXPECT_EQ(rawFB.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager())); + EXPECT_EQ(rawFB.getStatusContainer().getStatus("SubscriptionStatus"), stWaitData); + + msg = {topic1, dataToSend[1], 2, 0}; + ASSERT_TRUE(publisher.publishMsg(msg)); + tmr.restart(); + + hasData = false; + while (tmr.expired() == false && hasData == false) + hasData = rawFB.getStatusContainer().getStatus("SubscriptionStatus") == stHasData; + + EXPECT_TRUE(hasData); + + readerLambda(); + ASSERT_EQ(dataToReceive.size(), 1u); + ASSERT_EQ(dataToSend[1], dataToReceive[0]); +} diff --git a/mqtt_streaming_protocol/include/MqttDataWrapper.h b/mqtt_streaming_protocol/include/MqttDataWrapper.h index f89d5865..c601e7fa 100644 --- a/mqtt_streaming_protocol/include/MqttDataWrapper.h +++ b/mqtt_streaming_protocol/include/MqttDataWrapper.h @@ -47,6 +47,23 @@ struct MqttMsgDescriptor class MqttDataWrapper final { public: + struct CmdResult + { + bool success = false; + std::string msg; + + CmdResult() + : success(false), + msg("") + { + } + CmdResult(bool success, const std::string& msg = "") + : success(success), + msg(msg) + { + } + }; + MqttDataWrapper(daq::LoggerComponentPtr loggerComponent); static std::string extractDeviceName(const std::string& topic); @@ -56,7 +73,7 @@ class MqttDataWrapper final static std::string buildTopicFromId(const std::string& globalId); static std::string buildSignalsTopic(const std::string& deviceId); - static bool validateTopic(const daq::StringPtr topic, const daq::LoggerComponentPtr loggerComponent = nullptr); + static CmdResult validateTopic(const daq::StringPtr topic, const daq::LoggerComponentPtr loggerComponent = nullptr); void setConfig(const std::string& config); std::vector> extractDescription(); diff --git a/mqtt_streaming_protocol/src/MqttAsyncClient.cpp b/mqtt_streaming_protocol/src/MqttAsyncClient.cpp index 10081fc2..7b502b98 100644 --- a/mqtt_streaming_protocol/src/MqttAsyncClient.cpp +++ b/mqtt_streaming_protocol/src/MqttAsyncClient.cpp @@ -34,6 +34,7 @@ MqttAsyncClient::MqttAsyncClient(std::string serverUrl, std::string clientId, bo disconnOpts.context = this; createOpts = MQTTAsync_createOptions_initializer; + createOpts.maxBufferedMessages = 1000; } MqttAsyncClient::~MqttAsyncClient() diff --git a/mqtt_streaming_protocol/src/MqttDataWrapper.cpp b/mqtt_streaming_protocol/src/MqttDataWrapper.cpp index 928d1ab7..2828544c 100644 --- a/mqtt_streaming_protocol/src/MqttDataWrapper.cpp +++ b/mqtt_streaming_protocol/src/MqttDataWrapper.cpp @@ -123,15 +123,18 @@ std::string MqttDataWrapper::buildSignalsTopic(const std::string& deviceId) return (TOPIC_ALL_SIGNALS_PREFIX + deviceId + "/" + DEVICE_SIGNAL_LIST); } -bool MqttDataWrapper::validateTopic(const daq::StringPtr topic, const daq::LoggerComponentPtr loggerComponent) +MqttDataWrapper::CmdResult MqttDataWrapper::validateTopic(const daq::StringPtr topic, const daq::LoggerComponentPtr loggerComponent) { + + MqttDataWrapper::CmdResult result(true, ""); if (!topic.assigned() || topic.getLength() == 0) { + result = MqttDataWrapper::CmdResult(false, "Empty topic is not allowed!"); if (loggerComponent.assigned()) { - LOG_W("Empty topic is not allowed!"); + LOG_W("{}", result.msg); } - return false; + return result; } std::vector list; @@ -141,15 +144,18 @@ bool MqttDataWrapper::validateTopic(const daq::StringPtr topic, const daq::Logge { if (part == "#" || part == "+") { + result = MqttDataWrapper::CmdResult(false, + fmt::format("Wildcard characters '+' and '#' are not allowed in topic: {}", + topic.toStdString())); if (loggerComponent.assigned()) { - LOG_W("Wildcard characters '+' and '#' are not allowed in topic: {}", topic.toStdString()); + LOG_W("{}", result.msg); } - return false; + return result; } } - return true; + return result; } void MqttDataWrapper::setConfig(const std::string& config) @@ -179,7 +185,7 @@ std::vector> MqttDataWrapper:: { const rapidjson::Value& array = it->value; const std::string topic = it->name.GetString(); - if (!validateTopic(topic, loggerComponent)) + if (validateTopic(topic, loggerComponent).success == false) continue; if (!array.IsArray()) { diff --git a/mqtt_streaming_protocol/tests/CMakeLists.txt b/mqtt_streaming_protocol/tests/CMakeLists.txt index a63b8278..ff839876 100644 --- a/mqtt_streaming_protocol/tests/CMakeLists.txt +++ b/mqtt_streaming_protocol/tests/CMakeLists.txt @@ -6,18 +6,23 @@ set(TEST_SOURCES test_mqtt_streaming_protocol.cpp test_app.cpp ) +set(WRP_SOURCES MqttAsyncClientWrapper.h + MqttAsyncClientWrapper.cpp +) + +source_group("mqtt_client_sync_wrapper" FILES ${WRP_SOURCES}) + add_executable(${TEST_APP} ${TEST_SOURCES} ) -add_library(${TEST_LIB} STATIC MqttAsyncClientWrapper.h - MqttAsyncClientWrapper.cpp - Timer.h +add_library(${TEST_LIB} STATIC ${WRP_SOURCES} ) target_include_directories(${TEST_LIB} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ) target_link_libraries(${TEST_LIB} PRIVATE ${MODULE_NAME} + PUBLIC mqtt_streaming_helper ) target_link_libraries(${TEST_APP} PRIVATE daq::test_utils diff --git a/mqtt_streaming_protocol/tests/MqttAsyncClientWrapper.cpp b/mqtt_streaming_protocol/tests/MqttAsyncClientWrapper.cpp index 38c0bc58..9d5c5d62 100644 --- a/mqtt_streaming_protocol/tests/MqttAsyncClientWrapper.cpp +++ b/mqtt_streaming_protocol/tests/MqttAsyncClientWrapper.cpp @@ -1,11 +1,12 @@ #include "MqttAsyncClientWrapper.h" #include "MqttAsyncClient.h" -#include "Timer.h" +#include "mqtt_streaming_helper/timer.h" #include #include #include using namespace std::chrono; +using namespace helper::utils; MqttAsyncClientWrapper::MqttAsyncClientWrapper(std::string clientId) : instance(std::make_unique()), diff --git a/mqtt_streaming_protocol/tests/test_mqtt_streaming_protocol.cpp b/mqtt_streaming_protocol/tests/test_mqtt_streaming_protocol.cpp index 1416778a..56df3619 100644 --- a/mqtt_streaming_protocol/tests/test_mqtt_streaming_protocol.cpp +++ b/mqtt_streaming_protocol/tests/test_mqtt_streaming_protocol.cpp @@ -1,6 +1,6 @@ #include "MqttAsyncClient.h" #include "MqttAsyncClientWrapper.h" -#include "Timer.h" +#include "mqtt_streaming_helper/timer.h" #include "timestampConverter.h" #include #include @@ -9,6 +9,7 @@ using namespace mqtt; using namespace std::chrono; +using namespace helper::utils; class MqttStreamingProtocolTest : public ::testing::Test, public MqttAsyncClientWrapper { protected: