Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ delimiter: ','
encoding: UTF-8
display-name: CodecCsv
validate-length: true
publish-header: false
```
**default-header** - the default header for this codec. It will be used if no header found in the received batch.

Expand All @@ -75,7 +76,9 @@ validate-length: true
**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be attached to that root event.
The default value for the name is `CodecCsv`.

**validate-length** - check if csv have different count of values against header's count
**validate-length** - check if csv have different count of values against header's count.

**publish-header** - set to publish decoded header. The default value is `false`.

## Full configuration example

Expand Down
32 changes: 12 additions & 20 deletions src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,32 +32,21 @@
import org.slf4j.LoggerFactory;

import com.csvreader.CsvReader;
import com.exactpro.th2.codec.CodecException;
import com.exactpro.th2.codec.DecodeException;
import com.exactpro.th2.codec.api.IPipelineCodec;
import com.exactpro.th2.codec.api.IReportingContext;
import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration;
import com.exactpro.th2.codec.util.MessageUtilKt;
import com.exactpro.th2.common.event.Event;
import com.exactpro.th2.common.event.Event.Status;
import com.exactpro.th2.common.event.EventUtils;
import com.exactpro.th2.common.grpc.AnyMessage;
import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.grpc.EventID;
import com.exactpro.th2.common.grpc.Message;
import com.exactpro.th2.common.grpc.Message.Builder;
import com.exactpro.th2.common.grpc.MessageGroup;
import com.exactpro.th2.common.grpc.MessageGroupBatch;
import com.exactpro.th2.common.grpc.MessageID;
import com.exactpro.th2.common.grpc.MessageMetadata;
import com.exactpro.th2.common.grpc.RawMessage;
import com.exactpro.th2.common.grpc.RawMessageMetadata;
import com.exactpro.th2.common.message.MessageUtils;
import com.exactpro.th2.common.schema.message.MessageListener;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.exactpro.th2.common.value.ValueUtils;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;

public class CsvCodec implements IPipelineCodec {

Expand Down Expand Up @@ -168,12 +157,16 @@ private void decodeCsvData(Collection<ErrorHolder> errors, MessageGroup.Builder
LOGGER.debug("Set header to: " + Arrays.toString(strings));
}
header = strings;
AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder();
Builder headerMsg = Message.newBuilder();
// Not set message type
setMetadata(originalMetadata, headerMsg, HEADER_MSG_TYPE, currentIndex);
headerMsg.putFields(HEADER_FIELD_NAME, ValueUtils.toValue(strings));
messageBuilder.setMessage(headerMsg);

if (configuration.isPublishHeader()) {
AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder();
Builder headerMsg = Message.newBuilder();
// Not set message type
setMetadata(originalMetadata, headerMsg, HEADER_MSG_TYPE, currentIndex);
headerMsg.putFields(HEADER_FIELD_NAME, ValueUtils.toValue(strings));
messageBuilder.setMessage(headerMsg);
}

continue;
}

Expand Down Expand Up @@ -207,7 +200,6 @@ private void decodeCsvData(Collection<ErrorHolder> errors, MessageGroup.Builder
}
}


messageBuilder.setMessage(builder);
}
}
Expand Down Expand Up @@ -279,4 +271,4 @@ private void trimEachElement(String[] elements) {
elements[i] = element.trim();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,10 @@ public class CsvCodecConfiguration implements IPipelineCodecSettings {
@JsonPropertyDescription("Set to validate length of columns or not")
private boolean validateLength = true;

@JsonProperty("publish-header")
@JsonPropertyDescription("Set to enable header publication")
private boolean publishHeader = false;

public List<String> getDefaultHeader() {
return defaultHeader;
}
Expand Down Expand Up @@ -83,4 +87,12 @@ public boolean getValidateLength() {
public void setValidateLength(boolean validateLength) {
this.validateLength = validateLength;
}

public boolean isPublishHeader() {
return publishHeader;
}

public void setPublishHeader(boolean publishHeader) {
this.publishHeader = publishHeader;
}
}
39 changes: 36 additions & 3 deletions src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@ class TestPositive {
void decodeArrayWithDifferentLength() throws IOException {
CsvCodecConfiguration configuration = new CsvCodecConfiguration();
configuration.setValidateLength(false);
configuration.setPublishHeader(true);
CsvCodec codec = createCodec(configuration);
MessageGroup group = MessageGroup.newBuilder()
.addMessages(createCsvMessage("A,B, , ,", "1,2,3,4"))
Expand Down Expand Up @@ -95,6 +96,7 @@ void decodeArrayWithDifferentLength() throws IOException {
void decodeArrayInEnd() throws IOException {
CsvCodecConfiguration configuration = new CsvCodecConfiguration();
configuration.setValidateLength(false);
configuration.setPublishHeader(true);
CsvCodec codec = createCodec(configuration);
MessageGroup group = MessageGroup.newBuilder()
.addMessages(createCsvMessage("A,B,C ,", "1,2,3"))
Expand Down Expand Up @@ -176,7 +178,7 @@ void decodeArrayInMiddle() throws IOException {
void decodesDataAndSkipsHeader() {
CsvCodec codec = createCodec();
MessageGroup group = MessageGroup.newBuilder()
.addMessages(createCsvMessage("A,B,C", "1,2,3"))
.addMessages(createCsvMessage("A,B,C", "1,2,3"))
.build();

MessageGroup value = codec.decode(group);
Expand Down Expand Up @@ -208,6 +210,34 @@ void decodesDataAndSkipsHeader() {
);
}

@Test
void skipsHeaderPublishing() {
final var config = new CsvCodecConfiguration();
config.setPublishHeader(false);
CsvCodec codec = createCodec(config);
MessageGroup group = MessageGroup.newBuilder()
.addMessages(createCsvMessage("A,B,C", "1,2,3"))
.build();

MessageGroup value = codec.decode(group);
assertEquals(1, value.getMessagesCount());

Message message = getMessage(value, 0);
assertFieldCount(3, message);

assertAll(
() -> assertAll("Current message: " + message,
() -> {
assertEquals(1, message.getMetadata().getId().getSubsequenceCount());
assertEquals(2, message.getMetadata().getId().getSubsequence(0));
},
() -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)),
() -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)),
() -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message))
)
);
}

@Test
void trimsEndOfTheLine() {
CsvCodec codec = createCodec();
Expand Down Expand Up @@ -311,6 +341,7 @@ void decodesDataWithEscapedCharacters() {
void decodesDataCustomDelimiter() {
CsvCodecConfiguration configuration = new CsvCodecConfiguration();
configuration.setDelimiter(';');
configuration.setPublishHeader(true);
CsvCodec codec = createCodec(configuration);

MessageGroup group = MessageGroup.newBuilder()
Expand Down Expand Up @@ -420,7 +451,9 @@ void reportsErrorIfDefaultHeaderAndDataHaveDifferentSize() {
}

private CsvCodec createCodec() {
return createCodec(new CsvCodecConfiguration());
final var configuration = new CsvCodecConfiguration();
configuration.setPublishHeader(true);
return createCodec(configuration);
}

private CsvCodec createCodec(CsvCodecConfiguration configuration) {
Expand Down