diff --git a/README.md b/README.md index 0044210..7b11bff 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ The codec decodes each raw message in the received batch. Each raw message might contain several line in CSV format. If the default header parameter is not set the codec trites the first line from the raw message as a header. Otherwise, the default header will be used for decoding the rest of data. +Output message type is taken from `th2.csv.override_message_type` property in input message. +If the property missing, the default value (`Csv_Message`) for output message type is used. If no data was decoded from raw message, the message will be skipped, and an error event will be reported. diff --git a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java index 140f3d9..acc2709 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java @@ -55,6 +55,8 @@ public class CsvCodec implements IPipelineCodec { private static final String CSV_MESSAGE_TYPE = "Csv_Message"; private static final String HEADER_FIELD_NAME = "Header"; + private static final String OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type"; + private final CsvCodecConfiguration configuration; private final String[] defaultHeader; private final Charset charset; @@ -137,6 +139,8 @@ public void close() { private void decodeCsvData(Collection errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable data) { RawMessageMetadata originalMetadata = rawMessage.getMetadata(); + final String outputMessageType = originalMetadata.getPropertiesOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE); + int currentIndex = 0; String[] header = defaultHeader; for (String[] strings : data) { @@ -184,7 +188,7 @@ private void decodeCsvData(Collection errors, MessageGroup.Builder Builder builder = Message.newBuilder(); // Not set message type - setMetadata(originalMetadata, builder, CSV_MESSAGE_TYPE, currentIndex); + setMetadata(originalMetadata, builder, outputMessageType, currentIndex); int headerLength = header.length; int rowLength = strings.length; diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index c474b7c..317292c 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; @@ -79,6 +80,7 @@ void decodeArrayWithDifferentLength() throws IOException { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> { @@ -122,6 +124,7 @@ void decodeArrayInEnd() throws IOException { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -161,6 +164,7 @@ void decodeArrayInMiddle() throws IOException { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> { @@ -202,6 +206,7 @@ void decodesDataAndSkipsHeader() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -230,6 +235,40 @@ void skipsHeaderPublishing() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } + + @Test + void settingMessageTypeFromIncomingMessage() { + final var customType = "csv_test_type"; + + final var config = new CsvCodecConfiguration(); + config.setPublishHeader(false); + CsvCodec codec = createCodec(config); + final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", customType), "A,B,C", "1,2,3"); + + MessageGroup group = MessageGroup.newBuilder() + .addMessages(csvMessage) + .build(); + + MessageGroup value = codec.decode(group); + assertEquals(1, value.getMessagesCount()); + + Message message = getMessage(value, 0); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals(customType, message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -265,6 +304,7 @@ void trimsEndOfTheLine() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -293,6 +333,7 @@ void decodesDataUsingDefaultHeader() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(1, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -330,6 +371,7 @@ void decodesDataWithEscapedCharacters() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("\"value\"", getFieldValue(message, "B", () -> "No field B. " + message)) @@ -370,6 +412,7 @@ void decodesDataCustomDelimiter() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("3", getFieldValue(message, "B", () -> "No field B. " + message)) @@ -407,6 +450,7 @@ void trimsWhitespacesDuringDecoding() { () -> { assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); }, () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), () -> assertEquals("", getFieldValue(message, "B", () -> "No field B. " + message)), @@ -461,10 +505,15 @@ private CsvCodec createCodec(CsvCodecConfiguration configuration) { } private AnyMessage createCsvMessage(String... data) { + return createCsvMessage(Map.of(), data); + } + + private AnyMessage createCsvMessage(Map metadataProps, String... data) { Builder builder = RawMessage.newBuilder() .setBody(ByteString.copyFrom(String.join(StringUtils.LF, data).getBytes(StandardCharsets.UTF_8))); RawMessageMetadata.Builder metadataBuilder = RawMessageMetadata.newBuilder() - .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()); + .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()) + .putAllProperties(metadataProps); builder.setMetadata(metadataBuilder.build()); return AnyMessage.newBuilder().setRawMessage(builder).build(); }