Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The codec decodes each raw message in the received batch.
Each raw message might contain several line in CSV format.
If the default header parameter is not set the codec trites the first line from the raw message as a header.
Otherwise, the default header will be used for decoding the rest of data.
Output message type is taken from `th2.csv.override_message_type` property in input message.
If the property missing, the default value (`Csv_Message`) for output message type is used.

If no data was decoded from raw message, the message will be skipped, and an error event will be reported.

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class CsvCodec implements IPipelineCodec {
private static final String CSV_MESSAGE_TYPE = "Csv_Message";
private static final String HEADER_FIELD_NAME = "Header";

private static final String OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type";

private final CsvCodecConfiguration configuration;
private final String[] defaultHeader;
private final Charset charset;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void close() {
private void decodeCsvData(Collection<ErrorHolder> errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable<String[]> data) {
RawMessageMetadata originalMetadata = rawMessage.getMetadata();

final String outputMessageType = originalMetadata.getPropertiesOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE);

int currentIndex = 0;
String[] header = defaultHeader;
for (String[] strings : data) {
Expand Down Expand Up @@ -184,7 +188,7 @@ private void decodeCsvData(Collection<ErrorHolder> errors, MessageGroup.Builder

Builder builder = Message.newBuilder();
// Not set message type
setMetadata(originalMetadata, builder, CSV_MESSAGE_TYPE, currentIndex);
setMetadata(originalMetadata, builder, outputMessageType, currentIndex);

int headerLength = header.length;
int rowLength = strings.length;
Expand Down
51 changes: 50 additions & 1 deletion src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -79,6 +80,7 @@ void decodeArrayWithDifferentLength() throws IOException {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> {
Expand Down Expand Up @@ -122,6 +124,7 @@ void decodeArrayInEnd() throws IOException {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
Expand Down Expand Up @@ -161,6 +164,7 @@ void decodeArrayInMiddle() throws IOException {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> {
Expand Down Expand Up @@ -202,6 +206,7 @@ void decodesDataAndSkipsHeader() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
Expand Down Expand Up @@ -230,6 +235,40 @@ void skipsHeaderPublishing() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
() -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message))
)
);
}

@Test
void settingMessageTypeFromIncomingMessage() {
final var customType = "csv_test_type";

final var config = new CsvCodecConfiguration();
config.setPublishHeader(false);
CsvCodec codec = createCodec(config);
final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", customType), "A,B,C", "1,2,3");

MessageGroup group = MessageGroup.newBuilder()
.addMessages(csvMessage)
.build();

MessageGroup value = codec.decode(group);
assertEquals(1, value.getMessagesCount());

Message message = getMessage(value, 0);
assertFieldCount(3, message);

assertAll(
() -> assertAll("Current message: " + message,
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals(customType, message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
Expand Down Expand Up @@ -265,6 +304,7 @@ void trimsEndOfTheLine() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
Expand Down Expand Up @@ -293,6 +333,7 @@ void decodesDataUsingDefaultHeader() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(1, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
Expand Down Expand Up @@ -330,6 +371,7 @@ void decodesDataWithEscapedCharacters() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("\"value\"", getFieldValue(message, "B", () -> "No field B. " + message))
Expand Down Expand Up @@ -370,6 +412,7 @@ void decodesDataCustomDelimiter() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("3", getFieldValue(message, "B", () -> "No field B. " + message))
Expand Down Expand Up @@ -407,6 +450,7 @@ void trimsWhitespacesDuringDecoding() {
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
assertEquals("Csv_Message", message.getMetadata().getMessageType());
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("", getFieldValue(message, "B", () -> "No field B. " + message)),
Expand Down Expand Up @@ -461,10 +505,15 @@ private CsvCodec createCodec(CsvCodecConfiguration configuration) {
}

private AnyMessage createCsvMessage(String... data) {
return createCsvMessage(Map.of(), data);
}

private AnyMessage createCsvMessage(Map<String, String> metadataProps, String... data) {
Builder builder = RawMessage.newBuilder()
.setBody(ByteString.copyFrom(String.join(StringUtils.LF, data).getBytes(StandardCharsets.UTF_8)));
RawMessageMetadata.Builder metadataBuilder = RawMessageMetadata.newBuilder()
.setId(MessageID.newBuilder().setSequence(System.nanoTime()).build());
.setId(MessageID.newBuilder().setSequence(System.nanoTime()).build())
.putAllProperties(metadataProps);
builder.setMetadata(metadataBuilder.build());
return AnyMessage.newBuilder().setRawMessage(builder).build();
}
Expand Down