Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ if(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
endif()

add_subdirectory(external)
add_subdirectory(helper_utils)
add_subdirectory(mqtt_streaming_protocol)
add_subdirectory(mqtt_streaming_module)

Expand Down
56 changes: 22 additions & 34 deletions examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ using namespace daq;

struct ConfigStruct {
std::string brokerAddress;
std::vector<std::string> topics;
std::string topic;
bool exit = true;
int error = 0;
};
Expand All @@ -20,7 +20,7 @@ ConfigStruct StartUp(int argc, char* argv[])
InputArgs args;
args.addArg("--help", "Show help message");
args.addArg("--address", "MQTT broker address", true);
args.setUsageHelp(APP_NAME " [options] <topic1> <topic2> ... <topicN>");
args.setUsageHelp(APP_NAME " [options] <topic>");
args.parse(argc, argv);

if (args.hasArg("--help") || args.hasUnknownArgs())
Expand All @@ -31,14 +31,14 @@ ConfigStruct StartUp(int argc, char* argv[])
}

config.brokerAddress = args.getArgValue("--address", "127.0.0.1");
config.topics = args.getPositionalArgs();

if (config.topics.empty())
const auto positionalArgs = args.getPositionalArgs();
if (positionalArgs.empty())
{
std::cout << "MQTT topics are required." << std::endl;
std::cout << "An MQTT topic is required." << std::endl;
config.error = -1;
return config;
}
config.topic = args.getPositionalArgs()[0];;

config.exit = false;
return config;
Expand Down Expand Up @@ -66,46 +66,34 @@ int main(int argc, char* argv[])

// Create RAW function block configuration
auto config = availableFbs.get(fbName).createDefaultConfig();
auto topicList = List<IString>();
for (auto& topic : appConfig.topics)
{
addToList(topicList, std::move(topic));
}
config.setPropertyValue("SignalList", topicList);
config.setPropertyValue("Topic", appConfig.topic);

// Add the RAW function block to the broker FB
daq::FunctionBlockPtr rawFb = brokerFB.addFunctionBlock(fbName, config);

// Create packet readers for all signals
const auto signals = rawFb.getSignals();
std::map<std::string, PacketReaderPtr> readers;
for (const auto& s : signals)
{
readers.emplace(std::pair<std::string, PacketReaderPtr>(s.getName().toStdString(), daq::PacketReader(s)));
}
// Create packet readers for a signal
const auto signal = rawFb.getSignals()[0];
PacketReaderPtr reader = daq::PacketReader(signal);

// Start a thread to read packets from the readers
// Start a thread to read packets from the reader
std::atomic<bool> running = true;
std::thread readerThread(
[&readers, &running]()
[&reader, &signal, &running]()
{
while (running)
{
for (const auto& [signalName, reader] : readers)
while (!reader.getEmpty() && running)
{
while (!reader.getEmpty() && running)
auto packet = reader.read();
if (packet.getType() == PacketType::Event)
{
continue;
}
else if (packet.getType() == PacketType::Data)
{
auto packet = reader.read();
if (packet.getType() == PacketType::Event)
{
continue;
}
else if (packet.getType() == PacketType::Data)
{
const auto dataPacket = packet.asPtr<IDataPacket>();
std::string dataStr(static_cast<char*>(dataPacket.getData()), dataPacket.getDataSize());
std::cout << signalName << " - " << dataStr << std::endl;
}
const auto dataPacket = packet.asPtr<IDataPacket>();
std::string dataStr(static_cast<char*>(dataPacket.getData()), dataPacket.getDataSize());
std::cout << signal.getName() << " - " << dataStr << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
Expand Down
19 changes: 19 additions & 0 deletions helper_utils/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
cmake_minimum_required(VERSION 3.10)

set(HELPER_LIB mqtt_streaming_helper)

set(HELPER_PRJ_VERSION "1.0.0")
set(HELPER_PRJ_NAME "HelperUtils")

project(${HELPER_PRJ_NAME} VERSION ${HELPER_PRJ_VERSION} LANGUAGES C CXX)



add_library(${HELPER_LIB} INTERFACE)

target_include_directories(${HELPER_LIB} INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include
)

target_sources(${HELPER_LIB} INTERFACE
${CMAKE_CURRENT_SOURCE_DIR}/include/mqtt_streaming_helper/timer.h
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,49 @@

#include <chrono>

namespace helper::utils
{
class Timer
{
public:
Timer(int ms)
Timer(size_t ms, bool start = true)
: period(ms),
firstStart(true)
{
start = std::chrono::steady_clock::now();
timeout = std::chrono::milliseconds(ms);
if (start)
restart();
}

std::chrono::milliseconds remain() const
{
auto now = std::chrono::steady_clock::now();
const auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
std::chrono::milliseconds newTout = (elapsed_ms >= timeout) ? std::chrono::milliseconds(0) : timeout - elapsed_ms;
return newTout;
}

bool expired()
{
return remain() == std::chrono::milliseconds(0);
return (firstStart) ? true : (remain() == std::chrono::milliseconds(0));
}

explicit operator std::chrono::milliseconds() const noexcept
{
return remain();
}

void restart()
{
firstStart = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like that assignment should go last in this method - after start value is initialized

start = std::chrono::steady_clock::now();
timeout = std::chrono::milliseconds(period);
}

protected:
std::chrono::steady_clock::time_point start;
std::chrono::milliseconds timeout;
std::chrono::milliseconds period;
bool firstStart;
};

} // namespace helper::utils
11 changes: 11 additions & 0 deletions mqtt_streaming_module/include/mqtt_streaming_module/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms
static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms
static constexpr uint32_t DEFAULT_PUB_QOS = 1;
static constexpr uint32_t DEFAULT_PUB_PACK_SIZE = 1;
static constexpr const char* DEFAULT_PUB_TOPIC_NAME = "";

static constexpr const char* PROPERTY_NAME_MQTT_BROKER_ADDRESS = "MqttBrokerAddress";
static constexpr const char* PROPERTY_NAME_MQTT_BROKER_PORT = "MqttBrokerPort";
static constexpr const char* PROPERTY_NAME_MQTT_USERNAME = "MqttUsername";
static constexpr const char* PROPERTY_NAME_MQTT_PASSWORD = "MqttPassword";
static constexpr const char* PROPERTY_NAME_CONNECT_TIMEOUT = "ConnectTimeout";
static constexpr const char* PROPERTY_NAME_SIGNAL_LIST = "SignalList";
static constexpr const char* PROPERTY_NAME_TOPIC = "Topic";

static constexpr const char* PROPERTY_NAME_PUB_TOPIC_MODE = "TopicMode";
static constexpr const char* PROPERTY_NAME_PUB_TOPIC_NAME = "Topic";
static constexpr const char* PROPERTY_NAME_PUB_SHARED_TS = "SharedTimestamp";
static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES = "GroupValues";
static constexpr const char* PROPERTY_NAME_PUB_USE_SIGNAL_NAMES = "UseSignalNames";
Expand All @@ -40,8 +43,16 @@ static constexpr const char* ROOT_FB_NAME = "@rootMqttFb";

static const char* MQTT_LOCAL_ROOT_FB_ID_PREFIX = "rootMqttFb";
static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb";
static const char* MQTT_LOCAL_RAW_FB_ID_PREFIX = "rawMqttFb";
static const char* MQTT_LOCAL_JSON_FB_ID_PREFIX = "jsonMqttFb";


static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType";
static const char* MQTT_RAW_FB_SUB_STATUS_TYPE = "MqttSubscriptionStatusType";
static const char* MQTT_PUB_FB_SIG_STATUS_TYPE = "MqttSignalStatusType";
static const char* MQTT_PUB_FB_PUB_STATUS_TYPE = "MqttPublishingStatusType";

static const char* MQTT_PUB_FB_SIG_STATUS_NAME = "SignalStatus";
static const char* MQTT_PUB_FB_PUB_STATUS_NAME = "PublishingStatus";

END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class HandlerFactory
{
if (config.sharedTs)
{
return std::make_unique<MultipleSharedHandler>(config.useSignalNames, publisherFbGlobalId);
return std::make_unique<MultipleSharedHandler>(config.useSignalNames,
config.topicName.empty() ? publisherFbGlobalId : config.topicName);
}
else if (config.topicMode == TopicMode::Single)
{
Expand All @@ -42,7 +43,8 @@ class HandlerFactory
}
else if (config.topicMode == TopicMode::Multi)
{
return std::make_unique<MultipleHandler>(config.useSignalNames, publisherFbGlobalId);
return std::make_unique<MultipleHandler>(config.useSignalNames,
config.topicName.empty() ? publisherFbGlobalId : config.topicName);
}

return std::make_unique<SingleHandler>(config.useSignalNames);
Expand Down
39 changes: 35 additions & 4 deletions mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
class MqttBaseFb : public FunctionBlock
{
public:
enum class SubscriptionStatus : EnumType
{
InvalidTopicName = 0,
SubscribingError,
WaitingForData,
HasData
};

struct CmdResult
{
bool success = false;
std::string msg;
int token = 0;

CmdResult(bool success = false, const std::string& msg = "", int token = 0)
: success(success),
msg(msg),
token(token)
{
}
};


explicit MqttBaseFb(const ContextPtr& ctx,
const ComponentPtr& parent,
const FunctionBlockTypePtr& type,
Expand All @@ -34,7 +57,10 @@ class MqttBaseFb : public FunctionBlock
~MqttBaseFb() = default;

protected:
static std::vector<std::pair<SubscriptionStatus, std::string>> subscriptionStatusMap;

std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
EnumerationPtr subscriptionStatus;

virtual void createSignals() = 0;
virtual void processMessage(const mqtt::MqttMessage& msg) = 0;
Expand All @@ -44,12 +70,17 @@ class MqttBaseFb : public FunctionBlock

void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg);

virtual std::vector<std::string> getSubscribedTopics() const = 0;
virtual void clearSubscribedTopics() = 0;
virtual void subscribeToTopics();
virtual void unsubscribeFromTopics();
virtual std::string getSubscribedTopic() const = 0;
virtual void clearSubscribedTopic() = 0;
virtual CmdResult subscribeToTopic();
virtual CmdResult unsubscribeFromTopic();

virtual void propertyChanged() = 0;

void removed() override;

void initSubscriptionStatus();
void setSubscriptionStatus(const SubscriptionStatus status, std::string message = "");
};

END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class MqttJsonReceiverFbImpl final : public MqttBaseFb
explicit MqttJsonReceiverFbImpl(const ContextPtr& ctx,
const ComponentPtr& parent,
const FunctionBlockTypePtr& type,
const StringPtr& localId,
std::shared_ptr<mqtt::MqttAsyncClient> subscriber,
const PropertyObjectPtr& config = nullptr);
~MqttJsonReceiverFbImpl() override;
Expand All @@ -42,14 +41,19 @@ class MqttJsonReceiverFbImpl final : public MqttBaseFb
mutable std::mutex sync;
mqtt::MqttDataWrapper jsonDataWorker;
std::unordered_map<mqtt::SignalId, SignalConfigPtr> outputSignals;
std::vector<mqtt::SignalId> signalIdList;
std::unordered_map<mqtt::SignalId, DataDescriptorPtr> subscribedSignals;
std::vector<std::string> signalNameList;
std::unordered_map<std::string, DataDescriptorPtr> subscribedSignals;
std::string topicForSubscribing;
static std::atomic<int> localIndex;

static std::string getLocalId();

void createSignals() override;
void clearSubscribedTopics() override;
std::vector<std::string> getSubscribedTopics() const override;
void clearSubscribedTopic() override;
std::string getSubscribedTopic() const override;
void processMessage(const mqtt::MqttMessage& msg) override;
void readProperties() override;
void propertyChanged() override;

void createDataPacket(const std::string& topic, const std::string& json);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "MqttAsyncClient.h"
#include "MqttDataWrapper.h"
#include "mqtt_streaming_module/handler_base.h"
#include "mqtt_streaming_helper/timer.h"
#include <mqtt_streaming_module/common.h>
#include <mqtt_streaming_module/types.h>
#include <opendaq/function_block_impl.h>
Expand All @@ -27,6 +28,19 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
class MqttPublisherFbImpl final : public FunctionBlock
{
public:
enum class SignalStatus : EnumType
{
NotConnected = 0,
Invalid,
Valid
};

enum class PublishingStatus : EnumType
{
Ok = 0,
SampleSkipped
};

explicit MqttPublisherFbImpl(const ContextPtr& ctx,
const ComponentPtr& parent,
const FunctionBlockTypePtr& type,
Expand All @@ -40,7 +54,12 @@ class MqttPublisherFbImpl final : public FunctionBlock
void onConnected(const InputPortPtr& port) override;
void onDisconnected(const InputPortPtr& port) override;

static void addTypesToTypeManager(daq::TypeManagerPtr manager);

private:
static const std::vector<std::pair<SignalStatus, std::string>> signalStatusMap;
static const std::vector<std::pair<PublishingStatus, std::string>> publishingStatusMap;

static std::atomic<int> localIndex;
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient;
mqtt::MqttDataWrapper jsonDataWorker;
Expand All @@ -51,10 +70,20 @@ class MqttPublisherFbImpl final : public FunctionBlock
std::atomic<bool> running;
std::atomic<bool> hasError;
std::unique_ptr<HandlerBase> handler;
EnumerationPtr signalStatus;
EnumerationPtr publishingStatus;
uint64_t skippedMsgCnt;
uint64_t publishedMsgCnt;
Comment thread
NikolaiShipilov marked this conversation as resolved.
std::string lastSkippedReason;
helper::utils::Timer publishingStatusTimer;

static std::string getLocalId();
void setSignalStatus(const SignalStatus status, std::string message = "", bool init = false);
void setPublishingStatus(const PublishingStatus status, std::string message = "", bool init = false);
void updatePublishingStatus();
void initProperties(const PropertyObjectPtr& config);
void readProperties();
void propertyChanged();
void updateInputPorts();
void validateInputPorts();
template <typename retT, typename intfT>
Expand Down
Loading