Skip to content

Commit

Permalink
Merge pull request #8009 from ShvaykaD/bugfix/7998
Browse files Browse the repository at this point in the history
[3.4.4] rollback "tellFailure" logic change in TbAbstractGetAttributesNode
  • Loading branch information
ashvayka committed Jan 30, 2023
2 parents 732a016 + 2b5e326 commit 9ff4a1d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 46 deletions.
Expand Up @@ -110,9 +110,6 @@ private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap)
);
withCallback(allFutures, futuresList -> {
if (!failuresMap.isEmpty()) {
throw reportFailures(failuresMap);
}
TbMsgMetaData msgMetaData = msg.getMetaData().copy();
futuresList.stream().filter(Objects::nonNull).forEach(kvEntriesMap -> {
kvEntriesMap.forEach((keyScope, kvEntryList) -> {
Expand All @@ -127,10 +124,13 @@ private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
});
});
});
if (fetchToData) {
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msgMetaData, JacksonUtil.toString(msgDataNode)));
TbMsg outMsg = fetchToData ?
TbMsg.transformMsgData(msg, JacksonUtil.toString(msgDataNode)) :
TbMsg.transformMsg(msg, msgMetaData);
if (failuresMap.isEmpty()) {
ctx.tellSuccess(outMsg);
} else {
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msgMetaData, msg.getData()));
ctx.tellFailure(outMsg, reportFailures(failuresMap));
}
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
}
Expand Down
Expand Up @@ -134,16 +134,16 @@ public void fetchToMetadata_whenOnMsg_then_success() throws Exception {
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);

TbMsg resultMsg = checkMsg();
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
// check msg
TbMsg resultMsg = checkMsg(true);

//check attributes
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
checkAttributes(resultMsg, false, "cs_", clientAttributes);
checkAttributes(resultMsg, false, "ss_", serverAttributes);
checkAttributes(resultMsg, false, "shared_", sharedAttributes);

//check timeseries
checkTs(tsKeys, false, false, msgMetaData, null);
checkTs(resultMsg, false, false, tsKeys);
}

@Test
Expand All @@ -152,16 +152,16 @@ public void fetchToMetadata_latestWithTs_whenOnMsg_then_success() throws Excepti
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);

TbMsg resultMsg = checkMsg();
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
// check msg
TbMsg resultMsg = checkMsg(true);

//check attributes
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
checkAttributes(resultMsg, false, "cs_", clientAttributes);
checkAttributes(resultMsg, false, "ss_", serverAttributes);
checkAttributes(resultMsg, false, "shared_", sharedAttributes);

//check timeseries with ts
checkTs(tsKeys, false, true, msgMetaData, null);
checkTs(resultMsg, false, true, tsKeys);
}

@Test
Expand All @@ -170,16 +170,16 @@ public void fetchToData_whenOnMsg_then_success() throws Exception {
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);

TbMsg resultMsg = checkMsg();
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
// check msg
TbMsg resultMsg = checkMsg(true);

//check attributes
checkAttributes(clientAttributes, "cs_", true, null, msgData);
checkAttributes(serverAttributes, "ss_", true, null, msgData);
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
checkAttributes(resultMsg, true, "cs_", clientAttributes);
checkAttributes(resultMsg, true, "ss_", serverAttributes);
checkAttributes(resultMsg, true, "shared_", sharedAttributes);

//check timeseries
checkTs(tsKeys, true, false, null, msgData);
checkTs(resultMsg, true, false, tsKeys);
}

@Test
Expand All @@ -188,16 +188,34 @@ public void fetchToData_latestWithTs_whenOnMsg_then_success() throws Exception {
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);

TbMsg resultMsg = checkMsg();
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
// check msg
TbMsg resultMsg = checkMsg(true);

//check attributes
checkAttributes(clientAttributes, "cs_", true, null, msgData);
checkAttributes(serverAttributes, "ss_", true, null, msgData);
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
checkAttributes(resultMsg, true, "cs_", clientAttributes);
checkAttributes(resultMsg, true, "ss_", serverAttributes);
checkAttributes(resultMsg, true, "shared_", sharedAttributes);

//check timeseries with ts
checkTs(tsKeys, true, true, null, msgData);
checkTs(resultMsg, true, true, tsKeys);
}

@Test
public void fetchToMetadata_whenOnMsg_then_failure() throws Exception {
TbGetAttributesNode node = initNode(false, false, true);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);

// check msg
TbMsg actualMsg = checkMsg(false);

//check attributes
checkAttributes(actualMsg, false, "cs_", clientAttributes);
checkAttributes(actualMsg, false, "ss_", serverAttributes);
checkAttributes(actualMsg, false, "shared_", sharedAttributes);

//check timeseries with ts
checkTs(actualMsg, false, false, tsKeys);
}

@Test
Expand All @@ -206,33 +224,46 @@ public void fetchToData_whenOnMsg_then_failure() throws Exception {
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);

ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Mockito.verify(ctx, never()).tellSuccess(any());
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture());
// check msg
TbMsg actualMsg = checkMsg(false);

Assert.assertSame(newMsgCaptor.getValue(), msg);
Assert.assertNotNull(exceptionCaptor.getValue());
//check attributes
checkAttributes(actualMsg, true, "cs_", clientAttributes);
checkAttributes(actualMsg, true, "ss_", serverAttributes);
checkAttributes(actualMsg, true, "shared_", sharedAttributes);

//check timeseries with ts
checkTs(actualMsg, true, true, tsKeys);
}

@Test
public void fetchToData_whenOnMsg_then_data_not_object_failure() throws Exception {
public void fetchToData_whenOnMsg_and_data_is_not_object_then_failure() throws Exception {
TbGetAttributesNode node = initNode(true, true, true);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), "[]");
node.onMsg(ctx, msg);

ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class);
Mockito.verify(ctx, never()).tellSuccess(any());
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture());

Assert.assertSame(newMsgCaptor.getValue(), msg);
Assert.assertSame(msg, newMsgCaptor.getValue());
Assert.assertNotNull(exceptionCaptor.getValue());
}

private TbMsg checkMsg() {
private TbMsg checkMsg(boolean checkSuccess) {
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
if (checkSuccess) {
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
} else {
ArgumentCaptor<RuntimeException> exceptionCaptor = ArgumentCaptor.forClass(RuntimeException.class);
Mockito.verify(ctx, never()).tellSuccess(any());
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(msgCaptor.capture(), exceptionCaptor.capture());
RuntimeException exception = exceptionCaptor.getValue();
Assert.assertNotNull(exception);
Assert.assertNotNull(exception.getMessage());
Assert.assertTrue(exception.getMessage().startsWith("The following attribute/telemetry keys is not present in the DB:"));
}

TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Expand All @@ -241,22 +272,24 @@ private TbMsg checkMsg() {
return resultMsg;
}

private void checkAttributes(List<String> attributes, String prefix, boolean fetchToData, TbMsgMetaData msgMetaData, JsonNode msgData) {
private void checkAttributes(TbMsg actualMsg, boolean fetchToData, String prefix, List<String> attributes) {
JsonNode msgData = JacksonUtil.toJsonNode(actualMsg.getData());
attributes.stream()
.filter(attribute -> !attribute.equals("unknown"))
.forEach(attribute -> {
String result;
if (fetchToData) {
result = msgData.get(prefix + attribute).asText();
} else {
result = msgMetaData.getValue(prefix + attribute);
result = actualMsg.getMetaData().getValue(prefix + attribute);
}
Assert.assertNotNull(result);
Assert.assertEquals(attribute + "_value", result);
});
}

private void checkTs(List<String> tsKeys, boolean fetchToData, boolean getLatestValueWithTs, TbMsgMetaData msgMetaData, JsonNode msgData) {
private void checkTs(TbMsg actualMsg, boolean fetchToData, boolean getLatestValueWithTs, List<String> tsKeys) {
JsonNode msgData = JacksonUtil.toJsonNode(actualMsg.getData());
long value = 1L;
for (String key : tsKeys) {
if (key.equals("unknown")) {
Expand All @@ -272,7 +305,7 @@ private void checkTs(List<String> tsKeys, boolean fetchToData, boolean getLatest
if (fetchToData) {
actualValue = JacksonUtil.toString(msgData.get(key));
} else {
actualValue = msgMetaData.getValue(key);
actualValue = actualMsg.getMetaData().getValue(key);
}
Assert.assertNotNull(actualValue);
Assert.assertEquals(expectedValue, actualValue);
Expand Down Expand Up @@ -303,7 +336,7 @@ private TbMsg getTbMsg(EntityId entityId) {
msgMetaData.putValue("client_attr_metadata", "client_attr_3");
msgMetaData.putValue("server_attr_metadata", "server_attr_3");

return TbMsg.newMsg("TEST", entityId, msgMetaData, msgData.toString());
return TbMsg.newMsg("TEST", entityId, msgMetaData, JacksonUtil.toString(msgData));
}

private List<String> getAttributeNames(String prefix) {
Expand Down

0 comments on commit 9ff4a1d

Please sign in to comment.