From f96329360b9857e77fc68ffdd53b3d61b90a181c Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Tue, 30 Aug 2022 21:11:37 +0400 Subject: [PATCH 1/3] Customized message type from message's properties --- README.md | 3 + .../com/exactpro/th2/codec/csv/CsvCodec.java | 18 +++++- .../codec/csv/cfg/CsvCodecConfiguration.java | 11 ++++ .../exactpro/th2/codec/csv/TestCsvCodec.java | 59 ++++++++++++++++++- 4 files changed, 89 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0044210..7e939d0 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ encoding: UTF-8 display-name: CodecCsv validate-length: true publish-header: false +th2.csv.override_message_type: "message_type_prop_name" ``` **default-header** - the default header for this codec. It will be used if no header found in the received batch. @@ -80,6 +81,8 @@ The default value for the name is `CodecCsv`. **publish-header** - set to publish decoded header. The default value is `false`. +**th2.csv.override_message_type** - name of property in incoming message containing message type. The default values are `message_type` and `MESSAGE_TYPE` + ## Full configuration example ```yaml diff --git a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java index 140f3d9..f56dacf 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java @@ -55,6 +55,9 @@ public class CsvCodec implements IPipelineCodec { private static final String CSV_MESSAGE_TYPE = "Csv_Message"; private static final String HEADER_FIELD_NAME = "Header"; + private static final String DEFAULT_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "message_type"; + private static final String DEFAULT_MESSAGE_TYPE_PROP_NAME_UPPERCASE = "MESSAGE_TYPE"; + private final CsvCodecConfiguration configuration; private final String[] defaultHeader; private final Charset charset; @@ -137,6 +140,19 @@ public void close() { private void decodeCsvData(Collection errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable data) { RawMessageMetadata originalMetadata = rawMessage.getMetadata(); + final String outputMessageType; + if (configuration.getMessageTypePropertyName() != null) { + outputMessageType = originalMetadata.getPropertiesOrDefault(configuration.getMessageTypePropertyName(), CSV_MESSAGE_TYPE); + } else { + if (originalMetadata.containsProperties(DEFAULT_MESSAGE_TYPE_PROP_NAME_LOWERCASE)) { + outputMessageType = originalMetadata.getPropertiesOrThrow(DEFAULT_MESSAGE_TYPE_PROP_NAME_LOWERCASE); + } else if (originalMetadata.containsProperties(DEFAULT_MESSAGE_TYPE_PROP_NAME_UPPERCASE)) { + outputMessageType = originalMetadata.getPropertiesOrThrow(DEFAULT_MESSAGE_TYPE_PROP_NAME_UPPERCASE); + } else { + outputMessageType = CSV_MESSAGE_TYPE; + } + } + int currentIndex = 0; String[] header = defaultHeader; for (String[] strings : data) { @@ -184,7 +200,7 @@ private void decodeCsvData(Collection errors, MessageGroup.Builder Builder builder = Message.newBuilder(); // Not set message type - setMetadata(originalMetadata, builder, CSV_MESSAGE_TYPE, currentIndex); + setMetadata(originalMetadata, builder, outputMessageType, currentIndex); int headerLength = header.length; int rowLength = strings.length; diff --git a/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java b/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java index f1afa3d..e063702 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java +++ b/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java @@ -48,6 +48,9 @@ public class CsvCodecConfiguration implements IPipelineCodecSettings { @JsonPropertyDescription("Set to enable header publication") private boolean publishHeader = false; + @JsonProperty("th2.csv.override_message_type") + private String messageTypePropertyName = null; + public List getDefaultHeader() { return defaultHeader; } @@ -95,4 +98,12 @@ public boolean isPublishHeader() { public void setPublishHeader(boolean publishHeader) { this.publishHeader = publishHeader; } + + public String getMessageTypePropertyName() { + return messageTypePropertyName; + } + + public void setMessageTypePropertyName(String messageTypePropertyName) { + this.messageTypePropertyName = messageTypePropertyName; + } } diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index c474b7c..bebc6ff 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; @@ -79,6 +80,7 @@ void decodeArrayWithDifferentLength() throws IOException { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> { @@ -122,6 +124,7 @@ void decodeArrayInEnd() throws IOException { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -161,6 +164,7 @@ void decodeArrayInMiddle() throws IOException { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> { @@ -202,6 +206,7 @@ void decodesDataAndSkipsHeader() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -230,6 +235,48 @@ void skipsHeaderPublishing() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } + + @Test + void settingMessageTypeFromIncomingMessageDefaultPropName() { + settingMessageTypeFromIncomingMessage(null, "MESSAGE_TYPE"); + } + + @Test + void settingMessageTypeFromIncomingMessageConfigPropName() { + settingMessageTypeFromIncomingMessage("config_prop_name", "config_prop_name"); + } + + private void settingMessageTypeFromIncomingMessage(String configPropName, String messagePropName) { + final var config = new CsvCodecConfiguration(); + config.setPublishHeader(false); + config.setMessageTypePropertyName(configPropName); + CsvCodec codec = createCodec(config); + final var csvMessage = createCsvMessage( Map.of(messagePropName, "CSV"), "A,B,C", "1,2,3"); + + MessageGroup group = MessageGroup.newBuilder() + .addMessages(csvMessage) + .build(); + + MessageGroup value = codec.decode(group); + assertEquals(1, value.getMessagesCount()); + + Message message = getMessage(value, 0); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("CSV", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -265,6 +312,7 @@ void trimsEndOfTheLine() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -293,6 +341,7 @@ void decodesDataUsingDefaultHeader() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(1, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -330,6 +379,7 @@ void decodesDataWithEscapedCharacters() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("\"value\"", getFieldValue(message, "B", () -> "No field B. " + message)) @@ -370,6 +420,7 @@ void decodesDataCustomDelimiter() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("3", getFieldValue(message, "B", () -> "No field B. " + message)) @@ -407,6 +458,7 @@ void trimsWhitespacesDuringDecoding() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -461,10 +513,15 @@ private CsvCodec createCodec(CsvCodecConfiguration configuration) { } private AnyMessage createCsvMessage(String... data) { + return createCsvMessage(Map.of(), data); + } + + private AnyMessage createCsvMessage(Map metadataProps, String... data) { Builder builder = RawMessage.newBuilder() .setBody(ByteString.copyFrom(String.join(StringUtils.LF, data).getBytes(StandardCharsets.UTF_8))); RawMessageMetadata.Builder metadataBuilder = RawMessageMetadata.newBuilder() - .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()); + .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()) + .putAllProperties(metadataProps); builder.setMetadata(metadataBuilder.build()); return AnyMessage.newBuilder().setRawMessage(builder).build(); } From caec344076b58ad9632a907c662c59fdda5d07d4 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 31 Aug 2022 12:26:53 +0400 Subject: [PATCH 2/3] message type from th2.csv.override_message_type property --- README.md | 5 ++--- .../com/exactpro/th2/codec/csv/CsvCodec.java | 16 ++-------------- .../th2/codec/csv/cfg/CsvCodecConfiguration.java | 11 ----------- .../com/exactpro/th2/codec/csv/TestCsvCodec.java | 16 ++++------------ 4 files changed, 8 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 7e939d0..7b11bff 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ The codec decodes each raw message in the received batch. Each raw message might contain several line in CSV format. If the default header parameter is not set the codec trites the first line from the raw message as a header. Otherwise, the default header will be used for decoding the rest of data. +Output message type is taken from `th2.csv.override_message_type` property in input message. +If the property missing, the default value (`Csv_Message`) for output message type is used. If no data was decoded from raw message, the message will be skipped, and an error event will be reported. @@ -66,7 +68,6 @@ encoding: UTF-8 display-name: CodecCsv validate-length: true publish-header: false -th2.csv.override_message_type: "message_type_prop_name" ``` **default-header** - the default header for this codec. It will be used if no header found in the received batch. @@ -81,8 +82,6 @@ The default value for the name is `CodecCsv`. **publish-header** - set to publish decoded header. The default value is `false`. -**th2.csv.override_message_type** - name of property in incoming message containing message type. The default values are `message_type` and `MESSAGE_TYPE` - ## Full configuration example ```yaml diff --git a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java index f56dacf..acc2709 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java @@ -55,8 +55,7 @@ public class CsvCodec implements IPipelineCodec { private static final String CSV_MESSAGE_TYPE = "Csv_Message"; private static final String HEADER_FIELD_NAME = "Header"; - private static final String DEFAULT_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "message_type"; - private static final String DEFAULT_MESSAGE_TYPE_PROP_NAME_UPPERCASE = "MESSAGE_TYPE"; + private static final String OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type"; private final CsvCodecConfiguration configuration; private final String[] defaultHeader; @@ -140,18 +139,7 @@ public void close() { private void decodeCsvData(Collection errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable data) { RawMessageMetadata originalMetadata = rawMessage.getMetadata(); - final String outputMessageType; - if (configuration.getMessageTypePropertyName() != null) { - outputMessageType = originalMetadata.getPropertiesOrDefault(configuration.getMessageTypePropertyName(), CSV_MESSAGE_TYPE); - } else { - if (originalMetadata.containsProperties(DEFAULT_MESSAGE_TYPE_PROP_NAME_LOWERCASE)) { - outputMessageType = originalMetadata.getPropertiesOrThrow(DEFAULT_MESSAGE_TYPE_PROP_NAME_LOWERCASE); - } else if (originalMetadata.containsProperties(DEFAULT_MESSAGE_TYPE_PROP_NAME_UPPERCASE)) { - outputMessageType = originalMetadata.getPropertiesOrThrow(DEFAULT_MESSAGE_TYPE_PROP_NAME_UPPERCASE); - } else { - outputMessageType = CSV_MESSAGE_TYPE; - } - } + final String outputMessageType = originalMetadata.getPropertiesOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE); int currentIndex = 0; String[] header = defaultHeader; diff --git a/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java b/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java index e063702..f1afa3d 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java +++ b/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java @@ -48,9 +48,6 @@ public class CsvCodecConfiguration implements IPipelineCodecSettings { @JsonPropertyDescription("Set to enable header publication") private boolean publishHeader = false; - @JsonProperty("th2.csv.override_message_type") - private String messageTypePropertyName = null; - public List getDefaultHeader() { return defaultHeader; } @@ -98,12 +95,4 @@ public boolean isPublishHeader() { public void setPublishHeader(boolean publishHeader) { this.publishHeader = publishHeader; } - - public String getMessageTypePropertyName() { - return messageTypePropertyName; - } - - public void setMessageTypePropertyName(String messageTypePropertyName) { - this.messageTypePropertyName = messageTypePropertyName; - } } diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index bebc6ff..5ad05c4 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -245,21 +245,13 @@ void skipsHeaderPublishing() { } @Test - void settingMessageTypeFromIncomingMessageDefaultPropName() { - settingMessageTypeFromIncomingMessage(null, "MESSAGE_TYPE"); - } - - @Test - void settingMessageTypeFromIncomingMessageConfigPropName() { - settingMessageTypeFromIncomingMessage("config_prop_name", "config_prop_name"); - } + void settingMessageTypeFromIncomingMessage() { + final var CUSTOM_TYPE = "csv_test_type"; - private void settingMessageTypeFromIncomingMessage(String configPropName, String messagePropName) { final var config = new CsvCodecConfiguration(); config.setPublishHeader(false); - config.setMessageTypePropertyName(configPropName); CsvCodec codec = createCodec(config); - final var csvMessage = createCsvMessage( Map.of(messagePropName, "CSV"), "A,B,C", "1,2,3"); + final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", CUSTOM_TYPE), "A,B,C", "1,2,3"); MessageGroup group = MessageGroup.newBuilder() .addMessages(csvMessage) @@ -276,7 +268,7 @@ private void settingMessageTypeFromIncomingMessage(String configPropName, String () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("CSV", message.getMetadata().getMessageType()); + assertEquals(CUSTOM_TYPE, message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), From b2bb6fa14efa40216691f222c8d615a8d6308372 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Wed, 31 Aug 2022 12:37:28 +0400 Subject: [PATCH 3/3] rename 'customType' variable --- src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index 5ad05c4..317292c 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -246,12 +246,12 @@ void skipsHeaderPublishing() { @Test void settingMessageTypeFromIncomingMessage() { - final var CUSTOM_TYPE = "csv_test_type"; + final var customType = "csv_test_type"; final var config = new CsvCodecConfiguration(); config.setPublishHeader(false); CsvCodec codec = createCodec(config); - final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", CUSTOM_TYPE), "A,B,C", "1,2,3"); + final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", customType), "A,B,C", "1,2,3"); MessageGroup group = MessageGroup.newBuilder() .addMessages(csvMessage) @@ -268,7 +268,7 @@ void settingMessageTypeFromIncomingMessage() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals(CUSTOM_TYPE, message.getMetadata().getMessageType()); + assertEquals(customType, message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),