Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
]
}
```
The *Timestamp’* and *Unit’* fields may be omitted. The fields inside *Unit’* may also be omitted. Example:
The *Timestamp’* and *Unit’* fields may be omitted. The fields inside *Unit’* may also be omitted. The `"Value"` and `"Timestamp"` fields support dot-separated paths for accessing nested JSON fields, e.g. `"data.temperature"` or `"info.ts"` (see section 4 for details). Example:
```json
{
"/mirip/UNet3AC2/sensor/data":[
Expand Down Expand Up @@ -130,10 +130,17 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
- **Where**: *include/mqtt_streaming_module/mqtt_json_decoder_fb_impl.h, src/mqtt_json_decoder_fb_impl.cpp*
- **Purpose**: To parse JSON string data to extract a value and a timestamp, and to send data and domain samples based on this data.
- **Main properties**:
- *ValueKey* (string) — Specifies the JSON field name from which value data will be extracted. This property is required. It should be contained in the incoming JSON messages. Otherwise, a parsing error will occur.
- *ValueKey* (string) — Specifies the JSON field name (or dot-separated path for nested objects) from which value data will be extracted. Use `.` to access nested fields, e.g. `"data.temperature"` extracts the `temperature` field from inside the `data` object. This property is required. It should be contained in the incoming JSON messages. Otherwise, a parsing error will occur.
- *DomainMode* (list) — Defines how the timestamp of the decoded signal is generated. By default it is set to *None* (0), which means that the decoded signal doesn't have a timestamp. If set to *Extract from message* (1), the JSON decoder will try to extract the timestamp from the incoming JSON messages (see *DomainKey* property). If set to *System time* (2), the timestamp of the decoded signal is set to the system time when the JSON message is received.
- *DomainKey* (string) — Specifies the JSON field name from which timestamp will be extracted. This property is optional. If it is set it should be contained in the incoming JSON messages. Otherwise, a parsing error will occur.
- *DomainKey* (string) — Specifies the JSON field name (or dot-separated path for nested objects) from which the timestamp will be extracted. Dot notation is supported, e.g. `"info.timestamp"` extracts `timestamp` from inside the `info` object. This property is optional. If it is set it should be contained in the incoming JSON messages. Otherwise, a parsing error will occur.
- *Unit* (string) — Specifies the unit symbol for the decoded value. This property is optional.

Dot-notation paths support arbitrary nesting depth. For example, `"sensor.values.temperature"` traverses `sensor` → `values` → `temperature`.
Example of a nested JSON MQTT message and the corresponding property values:
```json
{"data": {"temperature": 25.68, "humidity": 72.1}, "info": {"timestamp": 1776332277}}
```
For this message, set *ValueKey* to `"data.temperature"` and *DomainKey* to `"info.timestamp"`.
---

## Building MQTTStreamingModule
Expand Down
25 changes: 25 additions & 0 deletions modules/mqtt_streaming_module/tests/test_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,31 @@ inline const std::string MISSING_FIELD_JSON_DATA_2 = R"json({
}
)json";

inline const std::string NESTED_JSON_DATA_0 = R"json({
"data": {"temperature": <placeholder_value>},
"info": {"timestamp": <placeholder_ts>}
}
)json";

inline const std::string NESTED_JSON_DATA_1 = R"json({
"data": {"temperature": <placeholder_value>}
}
)json";

inline const std::string NESTED_JSON_DATA_2 = R"json({
"data": {"temperature": <placeholder_value>},
"timestamp": <placeholder_ts>
}
)json";

inline const std::string DEEP_NESTED_JSON_DATA = R"json({
"sensor": {
"values": {"temperature": <placeholder_value>},
"metadata": {"unit": "°C", "ts": <placeholder_ts>, "location": "lab"}
}
}
)json";

inline const std::vector<std::pair<double, uint64_t>> DATA_DOUBLE_INT_0 = {{23.50000001, 1761567115},
{-0.00000005583, 1761567116},
{19.84916651651, 1761567117},
Expand Down
150 changes: 139 additions & 11 deletions modules/mqtt_streaming_module/tests/test_mqtt_json_decoder_fb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,20 +200,86 @@ std::vector<std::string> replacePlaceholders(const std::vector<std::pair<vT, tsT
return result;
}

std::string extractFieldName(std::string jsonTemplate, const std::string& valuePh)
std::string extractFieldName(const std::string& jsonTemplate, const std::string& valuePh)
{
std::string result;
size_t pos = jsonTemplate.find(valuePh);
const size_t pos = jsonTemplate.find(valuePh);
if (pos == std::string::npos)
return "";
size_t posEnd = jsonTemplate.rfind("\"", pos);
if (posEnd == std::string::npos)
return "";
size_t posStart = jsonTemplate.rfind("\"", posEnd - 1);
if (posStart == std::string::npos)
return "";
++posStart;
result = jsonTemplate.substr(posStart, posEnd - posStart);

const auto extractKeyBefore = [&](size_t from, std::string& key) -> size_t
{
const size_t closeQ = jsonTemplate.rfind('"', from);
if (closeQ == std::string::npos)
return std::string::npos;
const size_t openQ = jsonTemplate.rfind('"', closeQ - 1);
if (openQ == std::string::npos)
return std::string::npos;
key = jsonTemplate.substr(openQ + 1, closeQ - openQ - 1);
return openQ;
};

std::vector<std::string> segments;
size_t scanFrom = std::string::npos;

{
std::string key;
scanFrom = extractKeyBefore(pos, key);
if (scanFrom == std::string::npos)
return "";
segments.push_back(key);
}

// walk up ancestor objects by finding the '{' that opens each level
while (scanFrom > 0)
{
// scan backwards for the '{' at depth 0 that opens the current object
int depth = 0;
size_t p = scanFrom;
size_t bracePos = std::string::npos;
while (p > 0)
{
--p;
const char c = jsonTemplate[p];
if (c == '}')
++depth;
else if (c == '{')
{
if (depth == 0)
{
bracePos = p;
break;
}
--depth;
}
}
if (bracePos == std::string::npos)
break;

size_t q = bracePos;
while (q > 0 && std::isspace(static_cast<unsigned char>(jsonTemplate[q - 1])))
--q;
if (q == 0 || jsonTemplate[q - 1] != ':')
break; // root object — no parent key
--q; // skip ':'

std::string ancestorKey;
const size_t ancestorOpenQ = extractKeyBefore(q, ancestorKey);
if (ancestorOpenQ == std::string::npos)
break;

segments.push_back(ancestorKey);
scanFrom = ancestorOpenQ;
}

std::reverse(segments.begin(), segments.end());

std::string result;
for (size_t i = 0; i < segments.size(); ++i)
{
if (i > 0)
result += '.';
result += segments[i];
}
return result;
}

Expand Down Expand Up @@ -1181,3 +1247,65 @@ TEST_F(MqttJsonDecoderFbTest, PacketWithTheSameTS)
EXPECT_EQ(getComponentStatus(), ok);
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
}

TEST_F(MqttJsonDecoderFbTest, NestedValueFieldWithDomain)
{
auto dataToReceive = transferData(DATA_DOUBLE_INT_0, NESTED_JSON_DATA_0);
ASSERT_EQ(DATA_DOUBLE_INT_0.size(), dataToReceive.size());
ASSERT_TRUE(compareData(DATA_DOUBLE_INT_0, dataToReceive));
ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"),
Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager()));
EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos);
}

TEST_F(MqttJsonDecoderFbTest, NestedValueMixedDepth)
{
auto dataToReceive = transferData(DATA_DOUBLE_INT_2, NESTED_JSON_DATA_2);
ASSERT_EQ(DATA_DOUBLE_INT_2.size(), dataToReceive.size());
ASSERT_TRUE(compareData(DATA_DOUBLE_INT_2, dataToReceive));
ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"),
Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager()));
EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos);
}

TEST_F(MqttJsonDecoderFbTest, DeepNestedValueField)
{
auto dataToReceive = transferData(DATA_DOUBLE_INT_1, DEEP_NESTED_JSON_DATA);
ASSERT_EQ(DATA_DOUBLE_INT_1.size(), dataToReceive.size());
ASSERT_TRUE(compareData(DATA_DOUBLE_INT_1, dataToReceive));
ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"),
Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager()));
EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos);
}

TEST_F(MqttJsonDecoderFbTest, NestedValueFieldWithoutDomain)
{
auto dataToReceive = transferDataWithoutDomain(DATA_DOUBLE_INT_1, NESTED_JSON_DATA_1);
ASSERT_EQ(DATA_DOUBLE_INT_1.size(), dataToReceive.size());
ASSERT_TRUE(compareData(DATA_DOUBLE_INT_1, dataToReceive));
ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"),
Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager()));
EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos);
}

TEST_F(MqttJsonDecoderFbTest, NestedMissingField)
{
const auto topic = buildTopicName();
CreateDecoderFB(topic, "data.nonexistent", DDSM::None, "");

auto signal = getSignals()[0];
auto reader = daq::PacketReader(signal);

auto msgs = replacePlaceholders(DATA_DOUBLE_INT_0, DEEP_NESTED_JSON_DATA);
for (const auto& str : msgs)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
onSignalsMessage({topic, std::vector<uint8_t>(str.begin(), str.end()), 1, 0});
}

auto dataToReceive = read<double>(reader, signal, 1000);
ASSERT_EQ(0u, dataToReceive.size());
ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"),
Enumeration("ComponentStatusType", "Error", decoderObj.getContext().getTypeManager()));
EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing failed"), std::string::npos);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class MqttDataWrapper final

bool parseJson(ExtractionContext& ctx, const std::string& json);
bool parseJsonFields(ExtractionContext& ctx);
bool extractValue(ExtractionContext& ctx, const std::string& jsonFieldName);
bool extractTimestamp(ExtractionContext& ctx, const std::string& jsonFieldName);
bool extractValue(ExtractionContext& ctx, const rapidjson::Value& node);
bool extractTimestamp(ExtractionContext& ctx, const rapidjson::Value& node);
bool validateExtractionResult(ExtractionContext& ctx);
bool buildPackets(ExtractionContext& ctx, const uint64_t externalTs);

Expand Down
Loading
Loading