diff --git a/.github/workflows/build-dev-release.yml b/.github/workflows/build-dev-release.yml new file mode 100644 index 0000000..6cc5c82 --- /dev/null +++ b/.github/workflows/build-dev-release.yml @@ -0,0 +1,19 @@ +name: Build and publish dev release Docker image to Github Container Registry ghcr.io and publish dev release jar to sonatype + +on: workflow_dispatch + +jobs: + build: + uses: th2-net/.github/.github/workflows/compound-java.yml@main + with: + build-target: 'Sonatype,Docker' + devRelease: true + createTag: true + docker-username: ${{ github.actor }} + secrets: + docker-password: ${{ secrets.GITHUB_TOKEN }} + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + nvd-api-key: ${{ secrets.NVD_APIKEY }} \ No newline at end of file diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml new file mode 100644 index 0000000..9251e4e --- /dev/null +++ b/.github/workflows/build-release.yml @@ -0,0 +1,19 @@ +name: Build and publish release Docker image to Github Container Registry ghcr.io and publish release jar to sonatype + +on: workflow_dispatch + +jobs: + build: + uses: th2-net/.github/.github/workflows/compound-java.yml@main + with: + build-target: 'Sonatype,Docker' + devRelease: false + createTag: true + docker-username: ${{ github.actor }} + secrets: + docker-password: ${{ secrets.GITHUB_TOKEN }} + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + nvd-api-key: ${{ secrets.NVD_APIKEY }} \ No newline at end of file diff --git a/.github/workflows/build-sanpshot.yml b/.github/workflows/build-sanpshot.yml new file mode 100644 index 0000000..fc014bf --- /dev/null +++ b/.github/workflows/build-sanpshot.yml @@ -0,0 +1,24 @@ +name: Build and publish Docker image to Github Container Registry ghcr.io and publish snapshot jar to sonatype + +on: + push: + branches-ignore: + - master + - version-* + - dependabot** + paths-ignore: + - README.md + +jobs: + build-job: + uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main + with: + build-target: 'Sonatype,Docker' + docker-username: ${{ github.actor }} + secrets: + docker-password: ${{ secrets.GITHUB_TOKEN }} + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + nvd-api-key: ${{ secrets.NVD_APIKEY }} \ No newline at end of file diff --git a/.github/workflows/dev-docker-publish.yml b/.github/workflows/dev-docker-publish.yml deleted file mode 100644 index 6f77755..0000000 --- a/.github/workflows/dev-docker-publish.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: Dev build and publish Docker distributions to Github Container Registry ghcr.io - -on: - push: - branches-ignore: - - master - - version-* - - dependabot** - paths-ignore: - - README.md - -jobs: - build-job: - uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main - with: - build-target: 'Docker' - docker-username: ${{ github.actor }} - secrets: - docker-password: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml deleted file mode 100644 index cec50a6..0000000 --- a/.github/workflows/docker-publish.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: Build and publish Docker distributions to Github Container Registry ghcr.io - -on: - push: - branches: - - master - - version-* - paths: - - gradle.properties -# - package_info.json - -jobs: - build-job: - uses: th2-net/.github/.github/workflows/compound-java.yml@main - with: - build-target: 'Docker' - docker-username: ${{ github.actor }} - secrets: - docker-password: ${{ secrets.GITHUB_TOKEN }} - \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 5b78c51..803aa93 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:6.6-jdk11 AS build +FROM gradle:7.6-jdk11 AS build ARG release_version COPY ./ . RUN gradle build dockerPrepare \ diff --git a/README.md b/README.md index 42146b6..5e7e24e 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,18 @@ -# Csv codec (4.1.0) +# Csv codec (5.3.0) + ## Description -Designed for decode csv raw messages from csv reader to the parsed messages. -It is based on [th2-codec](https://github.com/th2-net/th2-codec). -You can find additional information [here](https://github.com/th2-net/th2-codec/blob/master/README.md) + +Designed for decode csv raw messages from csv reader to the parsed messages. It is based +on [th2-codec](https://github.com/th2-net/th2-codec). You can find additional +information [here](https://github.com/th2-net/th2-codec/blob/master/README.md) ## Decoding -The codec decodes each raw message in the received batch. -Each raw message might contain several line in CSV format. -If the default header parameter is not set the codec trites the first line from the raw message as a header. -Otherwise, the default header will be used for decoding the rest of data. -Output message type is taken from `th2.csv.override_message_type` property in input message. -If the property missing, the default value (`Csv_Message`) for output message type is used. +The codec decodes each raw message in the received batch. Each raw message might contain several line in CSV format. If +the default header parameter is not set the codec trites the first line from the raw message as a header. Otherwise, the +default header will be used for decoding the rest of data. Output message type is taken +from `th2.csv.override_message_type` property in input message. If the property missing, the default +value (`Csv_Message`) for output message type is used. If no data was decoded from raw message, the message will be skipped, and an error event will be reported. @@ -19,7 +20,7 @@ If no data was decoded from raw message, the message will be skipped, and an err ## Decode Example -Simple example: +Simple example: ```text A, B, V, G @@ -52,36 +53,46 @@ into { "A": 1, "B": 2, - "V": [3, 4, 5], + "V": [ + 3, + 4, + 5 + ], "G": 6, "D": 7 } ``` ## Settings + Csv codec has the following parameters: ```yaml -default-header: [A, B, C] +default-header: [ A, B, C ] delimiter: ',' encoding: UTF-8 display-name: CodecCsv validate-length: true publish-header: false +trim-whitespace: true ``` + **default-header** - the default header for this codec. It will be used if no header found in the received batch. + codec-csv trims all values in `default-header` and executes blank check. **delimiter** - the delimiter to split values in received data. The default value is `,`. **encoding** - the encoding for the received data. The default value is `UTF-8`. -**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be attached to that root event. -The default value for the name is `CodecCsv`. +**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be +attached to that root event. The default value for the name is `CodecCsv`. **validate-length** - check if csv have different count of values against header's count. **publish-header** - set to publish decoded header. The default value is `false`. +**trim-whitespace** - set to trim whitespace in header and cell. The default value is `true`. + ## Full configuration example ```yaml @@ -94,7 +105,7 @@ spec: image-version: 4.0.0 custom-config: codecSettings: - default-header: [A, B, C] + default-header: [ A, B, C ] delimiter: ',' encoding: UTF-8 pins: @@ -108,28 +119,59 @@ spec: # decoder - name: in_codec_decode connection-type: mq - attributes: ['decoder_in', 'raw', 'subscribe'] + attributes: [ 'decoder_in', 'raw', 'subscribe' ] - name: out_codec_decode connection-type: mq - attributes: ['decoder_out', 'parsed', 'publish'] + attributes: [ 'decoder_out', 'parsed', 'publish' ] # encoder general (technical) - name: in_codec_general_encode connection-type: mq - attributes: ['general_encoder_in', 'parsed', 'subscribe'] + attributes: [ 'general_encoder_in', 'parsed', 'subscribe' ] - name: out_codec_general_encode connection-type: mq - attributes: ['general_encoder_out', 'raw', 'publish'] + attributes: [ 'general_encoder_out', 'raw', 'publish' ] # decoder general (technical) - name: in_codec_general_decode connection-type: mq - attributes: ['general_decoder_in', 'raw', 'subscribe'] + attributes: [ 'general_decoder_in', 'raw', 'subscribe' ] - name: out_codec_general_decode connection-type: mq - attributes: ['general_decoder_out', 'parsed', 'publish'] + attributes: [ 'general_decoder_out', 'parsed', 'publish' ] ``` ## Release notes +### 5.3.0 ++ Migrated to th2 gradle plugin: `0.0.6` ++ Updated: + + bom `4.6.1` + + common: `5.10.1-dev` + + common-utils: `2.2.3-dev` + + codec: `5.5.0-dev` + +### 5.2.1 + ++ Updated common: `5.7.2-dev` ++ Updated codec: `5.4.1-dev` + +### 5.2.0 + ++ Added `trim-whitespace` option. ++ Updated common:5.7.1-dev ++ Updated common-utils:2.2.2-dev ++ Updated codec:5.4.0-dev + +### 5.1.0 + ++ Supports th2 transport protocol ++ Updated bom:4.4.0 ++ Updated common:5.3.0 ++ Updated codec:5.3.0 + +### 5.0.0 + ++ Migrated to books&pages concept + ### 4.1.0 + Migrated to `th2-codec:4.8.0` @@ -143,8 +185,13 @@ spec: ### 4.0.0 ++ Migration to **books/pages** cradle `4.0.0` ++ common updated from `3.13.1` to `4.0.0` ++ bom updated from `2.10.1` to `3.1.0` ++ bintray switched to **sonatype** + + Migrated to `th2-codec` core part. Uses the standard configuration format and pins for th2-codec -+ Updated `bom`: `3.1.0` -> `4.1.0` ++ Updated `bom`: `3.1.0` -> `4.2.0` + Updated `common`: `3.37.1` -> `3.44.0` ### 3.2.1 @@ -160,7 +207,8 @@ spec: + reads dictionaries from the /var/th2/config/dictionary folder. + uses mq_router, grpc_router, cradle_manager optional JSON configs from the /var/th2/config folder -+ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, default configuration ++ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, + default configuration + update Cradle version. Introduce async API for storing events + removed gRPC event loop handling + fixed dictionary reading diff --git a/build.gradle b/build.gradle index 2cb155a..a7cc66e 100644 --- a/build.gradle +++ b/build.gradle @@ -1,29 +1,21 @@ plugins { - id 'java-library' - id 'application' - id 'com.palantir.docker' version '0.25.0' - id "org.owasp.dependencycheck" version "8.1.0" -} - -dependencyCheck { - format='HTML' -} + id "org.jetbrains.kotlin.jvm" version "1.8.22" + id "org.jetbrains.kotlin.kapt" version "1.8.22" + id("java-library") + id("maven-publish") -ext { - sharedDir = file("${project.rootDir}/shared") + id "com.exactpro.th2.gradle.publish" version "0.0.6" + id "com.exactpro.th2.gradle.component" version "0.0.6" } group = 'com.exactpro.th2' version = release_version -sourceCompatibility = 11 -targetCompatibility = 11 +kotlin { + jvmToolchain(11) +} repositories { - maven { - name 'MavenLocal' - url sharedDir - } mavenCentral() maven { name 'Sonatype_snapshots' @@ -33,72 +25,43 @@ repositories { name 'Sonatype_releases' url 'https://s01.oss.sonatype.org/content/repositories/releases/' } + mavenLocal() - configurations.all { + configurations.configureEach { resolutionStrategy.cacheChangingModulesFor 0, 'seconds' resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' } } -jar { - manifest { - attributes( - 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})", - 'Specification-Title': '', - 'Specification-Vendor': 'Exactpro Systems LLC', - 'Implementation-Title': project.archivesBaseName, - 'Implementation-Vendor': 'Exactpro Systems LLC', - 'Implementation-Vendor-Id': 'com.exactpro', - 'Implementation-Version': project.version - ) - } -} - dependencies { - api platform('com.exactpro.th2:bom:4.2.0') + implementation "com.exactpro.th2:common:5.10.1-dev" + implementation "com.exactpro.th2:common-utils:2.2.3-dev" + implementation "com.exactpro.th2:codec:5.5.0-dev" - implementation 'com.exactpro.th2:common:3.44.1' - implementation 'com.exactpro.th2:codec:4.8.1' + implementation "org.apache.commons:commons-lang3" + implementation "com.fasterxml.jackson.core:jackson-databind" - implementation 'net.sourceforge.javacsv:javacsv:2.0' - implementation 'org.jetbrains:annotations:23.0.0' + implementation "net.sourceforge.javacsv:javacsv:2.0" - compileOnly 'com.google.auto.service:auto-service-annotations:1.0.1' - annotationProcessor 'com.google.auto.service:auto-service:1.0.1' + implementation "io.github.microutils:kotlin-logging:3.0.5" - testImplementation 'org.mockito:mockito-core:3.5.15' - testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2' -} + compileOnly "com.google.auto.service:auto-service-annotations:1.1.1" + kapt "com.google.auto.service:auto-service:1.1.1" -test { - useJUnitPlatform() + testImplementation "org.jetbrains.kotlin:kotlin-test-junit" + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation 'org.mockito.kotlin:mockito-kotlin:5.3.1' + testImplementation 'io.strikt:strikt-core:0.34.1' } application { - mainClassName 'com.exactpro.th2.codec.MainKt' -} - -applicationName = 'service' - -distTar { - archiveFileName.set("${applicationName}.tar") + mainClass.set("com.exactpro.th2.codec.MainKt") } -dockerPrepare { - dependsOn distTar -} - -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) +test { + useJUnitPlatform() } dependencyCheck { - formats=['SARIF', 'JSON', 'HTML'] - failBuildOnCVSS=5 - - analyzers { - assemblyEnabled = false - nugetconfEnabled = false - nodeEnabled = false - } + suppressionFile='supressions.xml' } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 878d6ca..0752ea5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1 @@ -release_version = 4.1.0 - -docker_image_name= +release_version=5.3.0 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index af547a8..6c9fe73 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip diff --git a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java index e0c860f..29fc44c 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java @@ -32,14 +32,7 @@ @AutoService(IPipelineCodecFactory.class) public class CodecFactory implements IPipelineCodecFactory { - public static final String PROTOCOL = "csv"; - private static final Set PROTOCOLS = Collections.singleton(PROTOCOL); - - @NotNull - @Override - public String getProtocol() { - return PROTOCOL; - } + private static final Set PROTOCOLS = Collections.singleton("csv"); @NotNull @Override @@ -73,4 +66,4 @@ public void init(@NotNull InputStream inputStream) { @Override public void close() { } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java deleted file mode 100644 index a047bec..0000000 --- a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.codec.csv; - -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.csvreader.CsvReader; -import com.exactpro.th2.codec.DecodeException; -import com.exactpro.th2.codec.api.IPipelineCodec; -import com.exactpro.th2.codec.api.IReportingContext; -import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration; -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.Message; -import com.exactpro.th2.common.grpc.Message.Builder; -import com.exactpro.th2.common.grpc.MessageGroup; -import com.exactpro.th2.common.grpc.MessageID; -import com.exactpro.th2.common.grpc.MessageMetadata; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageMetadata; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.value.ValueUtils; -import com.google.protobuf.ByteString; - -public class CsvCodec implements IPipelineCodec { - - private static final Logger LOGGER = LoggerFactory.getLogger(CsvCodec.class); - private static final String HEADER_MSG_TYPE = "Csv_Header"; - private static final String CSV_MESSAGE_TYPE = "Csv_Message"; - private static final String HEADER_FIELD_NAME = "Header"; - - private static final String OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type"; - - private final CsvCodecConfiguration configuration; - private final String[] defaultHeader; - private final Charset charset; - - public CsvCodec(CsvCodecConfiguration configuration) { - this.configuration = requireNonNull(configuration, "'Configuration' parameter"); - List defaultHeader = configuration.getDefaultHeader(); - this.defaultHeader = defaultHeader == null - ? null - : defaultHeader.stream().map(String::trim).toArray(String[]::new); - if (this.defaultHeader != null && this.defaultHeader.length == 0) { - throw new IllegalArgumentException("Default header must not be empty"); - } - LOGGER.info("Default header: {}", configuration.getDefaultHeader()); - charset = Charset.forName(configuration.getEncoding()); - } - - @NotNull - @Override - public MessageGroup decode(@NotNull MessageGroup messageGroup) { - MessageGroup.Builder groupBuilder = MessageGroup.newBuilder(); - Collection errors = new ArrayList<>(); - for (AnyMessage anyMessage : messageGroup.getMessagesList()) { - if (anyMessage.hasMessage()) { - groupBuilder.addMessages(anyMessage); - continue; - } - if (!anyMessage.hasRawMessage()) { - LOGGER.error("Message should either have a raw or parsed message but has nothing: {}", anyMessage); - continue; - } - RawMessage rawMessage = anyMessage.getRawMessage(); - String protocol = rawMessage.getMetadata().getProtocol(); - if (!"".equals(protocol) && !"csv".equalsIgnoreCase(protocol)) { - LOGGER.error("Wrong protocol: message should have empty or 'csv' protocol but has {}", protocol); - continue; - } - - ByteString body = rawMessage.getBody(); - List data = decodeValues(body); - if (data.isEmpty()) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("The raw message does not contains any data: {}", MessageUtils.toJson(rawMessage)); - } - errors.add(new ErrorHolder("The raw message does not contains any data", rawMessage)); - continue; - } - decodeCsvData(errors, groupBuilder, rawMessage, data); - } - if (!errors.isEmpty()) { - throw createException(errors); - } - return groupBuilder.build(); - } - - @NotNull - @Override - public MessageGroup decode(@NotNull MessageGroup messageGroup, @NotNull IReportingContext iReportingContext) { - return decode(messageGroup); - } - - private RuntimeException createException(Collection errors) { - return new DecodeException( - "Cannot decode some messages: " + System.lineSeparator() + errors.stream() - .map(it -> "Message " + MessageUtils.toJson(it.originalMessage.getMetadata().getId()) + " cannot be decoded because " + it.text) - .collect(Collectors.joining(System.lineSeparator())) - ); - } - - @NotNull - @Override - public MessageGroup encode(@NotNull MessageGroup messageGroup) { - throw new UnsupportedOperationException("encode method is not implemented"); - } - - @NotNull - @Override - public MessageGroup encode(@NotNull MessageGroup messageGroup, @NotNull IReportingContext iReportingContext) { - throw new UnsupportedOperationException("encode method is not implemented"); - } - - @Override - public void close() { - } - - private void decodeCsvData(Collection errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable data) { - RawMessageMetadata originalMetadata = rawMessage.getMetadata(); - - final String outputMessageType = originalMetadata.getPropertiesOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE); - - int currentIndex = 0; - String[] header = defaultHeader; - for (String[] strings : data) { - currentIndex++; - - if (strings.length == 0) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Empty raw at {} index (starts with 1). Data: {}", currentIndex, data); - } - errors.add(new ErrorHolder("Empty raw at " + currentIndex + " index (starts with 1)", rawMessage)); - continue; - } - - trimEachElement(strings); - - if (header == null) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Set header to: " + Arrays.toString(strings)); - } - header = strings; - - if (configuration.isPublishHeader()) { - AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder(); - Builder headerMsg = Message.newBuilder(); - // Not set message type - setMetadata(originalMetadata, headerMsg, HEADER_MSG_TYPE, currentIndex); - headerMsg.putFields(HEADER_FIELD_NAME, ValueUtils.toValue(strings)); - messageBuilder.setMessage(headerMsg); - } - - continue; - } - - if (strings.length != header.length && configuration.getValidateLength()) { - String msg = String.format("Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", - header.length, strings.length, originalMetadata.getId().getConnectionId().getSessionAlias()); - LOGGER.error(msg); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(rawMessage.toString()); - } - errors.add(new ErrorHolder(msg, strings, rawMessage)); - } - - AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder(); - - Builder builder = Message.newBuilder(); - // Not set message type - setMetadata(originalMetadata, builder, outputMessageType, currentIndex); - - int headerLength = header.length; - int rowLength = strings.length; - for (int i = 0; i < headerLength && i < rowLength; ) { - int extraLength = getHeaderArrayLength(header, i); - if (extraLength == 1) { - builder.putFields(header[i], ValueUtils.toValue(strings[i])); - i++; - } else { - String[] values = copyArray(strings, i, i+extraLength); - builder.putFields(header[i], ValueUtils.toValue(values)); - i+=extraLength; - } - } - - messageBuilder.setMessage(builder); - } - } - - public static String [] copyArray(String [] original, int from, int to){ - String [] copyArr = new String[Integer.min(to, original.length) - from]; - for (int i = from; i < to && i < original.length; i++){ - copyArr[i-from] = original[i]; - } - return copyArr; - } - - private int getHeaderArrayLength(String[] header, int index) { - int length = 1; - for (int i = index + 1; i < header.length && header[i].isEmpty(); i++) { - length++; - } - return length; - } - - private void setMetadata(RawMessageMetadata originalMetadata, Message.Builder messageBuilder, String messageType, int currentIndex) { - messageBuilder.setMetadata(MessageMetadata - .newBuilder() - .setId(MessageID.newBuilder(originalMetadata.getId()) - .addSubsequence(currentIndex) - .build()) - .setTimestamp(originalMetadata.getTimestamp()) - .putAllProperties(originalMetadata.getPropertiesMap()) - .setMessageType(messageType) - ); - } - - private List decodeValues(ByteString body) { - try (InputStream in = new ByteArrayInputStream(body.toByteArray())) { - CsvReader reader = new CsvReader(in, configuration.getDelimiter(), charset); - try { - List result = new ArrayList<>(); - while (reader.readRecord()) { - result.add(reader.getValues()); - } - return result; - } finally { - reader.close(); - } - } catch (IOException e) { - throw new RuntimeException("cannot read data from raw bytes", e); - } - } - - private static class ErrorHolder { - public final String text; - public final String[] currentRow; - public final RawMessage originalMessage; - - private ErrorHolder(String text, String[] currentRow, RawMessage originalMessage) { - this.text = text; - this.currentRow = currentRow; - this.originalMessage = originalMessage; - } - - private ErrorHolder(String text, RawMessage originalMessage) { - this(text, null, originalMessage); - } - } - - private void trimEachElement(String[] elements) { - for (int i = 0; i < elements.length; i++) { - String element = elements[i]; - elements[i] = element.trim(); - } - } -} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java b/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java index acfdcf4..cddf1d0 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java +++ b/src/main/java/com/exactpro/th2/codec/csv/cfg/CsvCodecConfiguration.java @@ -16,14 +16,14 @@ package com.exactpro.th2.codec.csv.cfg; -import java.nio.charset.StandardCharsets; -import java.util.List; - import com.exactpro.th2.codec.api.IPipelineCodecSettings; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import java.nio.charset.StandardCharsets; +import java.util.List; + public class CsvCodecConfiguration implements IPipelineCodecSettings { @JsonProperty("default-header") @JsonPropertyDescription("The default header that will be used for parsing received batch if no header found in the batch") @@ -48,6 +48,10 @@ public class CsvCodecConfiguration implements IPipelineCodecSettings { @JsonPropertyDescription("Set to enable header publication") private boolean publishHeader = false; + @JsonProperty("trim-whitespace") + @JsonPropertyDescription("Set to trim whitespace in header (when default-header isn't set) and cell") + private boolean trimWhitespace = true; + public List getDefaultHeader() { return defaultHeader; } @@ -95,4 +99,12 @@ public boolean isPublishHeader() { public void setPublishHeader(boolean publishHeader) { this.publishHeader = publishHeader; } + + public boolean isTrimWhitespace() { + return trimWhitespace; + } + + public void setTrimWhitespace(boolean trimWhitespace) { + this.trimWhitespace = trimWhitespace; + } } diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt new file mode 100644 index 0000000..8964f16 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt @@ -0,0 +1,202 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.csvreader.CsvReader +import com.exactpro.th2.codec.DecodeException +import mu.KotlinLogging +import java.io.ByteArrayInputStream +import java.io.IOException +import java.nio.charset.Charset +import kotlin.math.min + +abstract class AbstractDecoder( + private val charset: Charset, + private val csvDelimiter: Char, + private val defaultHeader: Array?, + private val publishHeader: Boolean, + private val validateLength: Boolean, + private val trimWhitespace: Boolean +) { + protected abstract val RAW_MESSAGE.messageMetadata: Map + protected abstract val RAW_MESSAGE.messageSessionAlias: String + protected abstract val RAW_MESSAGE.rawBody: ByteArray + protected abstract val RAW_MESSAGE.messageProtocol: String + protected abstract val RAW_MESSAGE.logId: String + protected abstract val RAW_MESSAGE.logData: String + protected abstract val ANY_MESSAGE.isParsed: Boolean + protected abstract val ANY_MESSAGE.isRaw: Boolean + protected abstract val ANY_MESSAGE.asRaw: RAW_MESSAGE + + protected abstract fun createParsedMessage(sourceMessage: RAW_MESSAGE, outputMessageType: String, body: Map, currentIndex: Int): ANY_MESSAGE + protected abstract fun String.toFieldValue(): BODY_FIELD_VALUE + protected abstract fun Array.toFieldValue(): BODY_FIELD_VALUE + + fun decode(messageGroup: List): List { + val groupBuilder = mutableListOf() // ProtoMessageGroup.newBuilder() + val errors: MutableCollection> = mutableListOf() + for (anyMessage in messageGroup) { + if (anyMessage.isParsed) { + groupBuilder += anyMessage + continue + } + if (!anyMessage.isRaw) { + LOGGER.error { "Message should either have a raw or parsed message but has nothing: $anyMessage" } + continue + } + val rawMessage = anyMessage.asRaw + val protocol = rawMessage.messageProtocol + if ("" != protocol && !"csv".equals(protocol, ignoreCase = true)) { + LOGGER.error { "Wrong protocol: message should have empty or 'csv' protocol but has $protocol" } + continue + } + val data = decodeValues(rawMessage.rawBody) + if (data.isEmpty()) { + LOGGER.error { "The raw message does not contains any data: ${rawMessage.logData}" } + errors.add(ErrorHolder("The raw message does not contains any data", rawMessage)) + continue + } + + decodeCsvData(errors, groupBuilder, rawMessage, data) + } + if (errors.isNotEmpty()) { + throw DecodeException( + "Cannot decode some messages:\n" + errors.joinToString("\n") { + "Message ${it.originalMessage.logId} cannot be decoded because ${it.text}" + } + ) + } + return groupBuilder + } + + private fun decodeCsvData( + errors: MutableCollection>, + groupBuilder: MutableList, + rawMessage: RAW_MESSAGE, + data: Iterable> + ) { + val originalMetadata = rawMessage.messageMetadata + val outputMessageType = originalMetadata.getOrDefault( + OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, + CSV_MESSAGE_TYPE + ) + var currentIndex = 0 + var header: Array? = defaultHeader + for (strings in data) { + currentIndex++ + if (strings.isEmpty()) { + LOGGER.error { "Empty raw at $currentIndex index (starts with 1). Data: $data" } + + errors.add(ErrorHolder("Empty raw at $currentIndex index (starts with 1)", rawMessage)) + continue + } + if (trimWhitespace) { + trimEachElement(strings) + } + if (header == null) { + LOGGER.debug { "Set header to: ${strings.contentToString()}" } + header = strings + if (publishHeader) { + //groupBuilder += createHeadersMessage(rawMessage, strings, currentIndex) + groupBuilder += createParsedMessage(rawMessage, + HEADER_MSG_TYPE, mapOf(HEADER_FIELD_NAME to strings.toFieldValue()), currentIndex) + } + continue + } + if (strings.size != header.size && validateLength) { + val msg = String.format( + "Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", + header.size, strings.size, rawMessage.messageSessionAlias + ) + LOGGER.error(msg) + LOGGER.debug { rawMessage.toString() } + errors.add(ErrorHolder(msg, rawMessage)) + } + + val headerLength = header.size + val rowLength = strings.size + var i = 0 + val body = mutableMapOf() + while (i < headerLength && i < rowLength) { + val extraLength = getHeaderArrayLength(header, i) + if (extraLength == 1) { + body[header[i]] = strings[i].toFieldValue() + i++ + } else { + val values = copyArray(strings, i, i + extraLength) + body[header[i]] = values.toFieldValue() + i += extraLength + } + } + + groupBuilder += createParsedMessage(rawMessage, outputMessageType, body, currentIndex) + } + } + + private fun copyArray(original: Array, from: Int, to: Int) = original.copyOfRange(from, + min(to, original.size) + ) + + private fun getHeaderArrayLength(header: Array, index: Int): Int { + var length = 1 + var i = index + 1 + while (i < header.size && header[i].isEmpty()) { + length++ + i++ + } + return length + } + + private fun decodeValues(body: ByteArray): List> { + try { + ByteArrayInputStream(body).use { + val reader = CsvReader(it, csvDelimiter, charset) + reader.trimWhitespace = trimWhitespace + return try { + val result: MutableList> = ArrayList() + while (reader.readRecord()) { + result.add(reader.values) + } + result + } finally { + reader.close() + } + } + } catch (e: IOException) { + throw RuntimeException("cannot read data from raw bytes", e) + } + } + + private class ErrorHolder( + val text: String, + val originalMessage: T + ) + + private fun trimEachElement(elements: Array) { + for (i in elements.indices) { + elements[i] = elements[i].trim() + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + private const val HEADER_MSG_TYPE = "Csv_Header" + private const val CSV_MESSAGE_TYPE = "Csv_Message" + private const val HEADER_FIELD_NAME = "Header" + private const val OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt new file mode 100644 index 0000000..a3d7458 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt @@ -0,0 +1,65 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.codec.api.IPipelineCodec +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import mu.KotlinLogging +import java.nio.charset.Charset + +class CsvCodec(private val config: CsvCodecConfiguration) : IPipelineCodec { + + private val charset: Charset = Charset.forName(config.encoding) + private val defaultHeader = config.defaultHeader?.asSequence()?.map(String::trim)?.toList()?.toTypedArray()?.apply { + require(isNotEmpty()) { "Default header must not be empty" } + LOGGER.info { "Default header: ${config.defaultHeader}" } + } + + private val protoDecoder = ProtoDecoder( + charset, + config.delimiter, + defaultHeader, + config.isPublishHeader, + config.validateLength, + config.isTrimWhitespace, + ) + private val transportDecoder = TransportDecoder( + charset, + config.delimiter, + defaultHeader, + config.isPublishHeader, + config.validateLength, + config.isTrimWhitespace, + ) + + override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { + val decodedMessages = protoDecoder.decode(messageGroup.messagesList) + return ProtoMessageGroup.newBuilder().addAllMessages(decodedMessages).build() + } + + override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { + val decodedMessages = transportDecoder.decode(messageGroup.messages) + return MessageGroup(decodedMessages) + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt new file mode 100644 index 0000000..4030937 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.common.grpc.Message +import com.exactpro.th2.common.grpc.AnyMessage +import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.grpc.MessageMetadata +import com.exactpro.th2.common.grpc.RawMessageMetadata +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.common.grpc.Value +import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId +import com.exactpro.th2.common.utils.message.sessionAlias +import com.exactpro.th2.common.value.toValue +import java.nio.charset.Charset + +class ProtoDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean, + trimWhitespace: Boolean +) : AbstractDecoder( + charset, + csvDelimiter, + defaultHeader, + publishHeader, + validateLength, + trimWhitespace, +) { + + override val RawMessage.messageMetadata: Map get() = metadata.propertiesMap + override val RawMessage.messageSessionAlias: String get() = sessionAlias ?: error("No session alias in message") + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = metadata.protocol + override val RawMessage.logId: String get() = this.id.logId + override val RawMessage.logData: String get() = toJson() + override val AnyMessage.isParsed: Boolean get() = hasMessage() + override val AnyMessage.isRaw: Boolean get() = hasRawMessage() + override val AnyMessage.asRaw: RawMessage get() = rawMessage + + override fun createParsedMessage(sourceMessage: RawMessage, outputMessageType: String, body: Map, currentIndex: Int): AnyMessage { + val builder = Message.newBuilder() + .putAllFields(body) + .setParentEventId(sourceMessage.parentEventId) + + // Not set message type + setMetadata(sourceMessage.metadata, builder, outputMessageType, currentIndex) + return AnyMessage.newBuilder().setMessage(builder).build() + } + + override fun String.toFieldValue(): Value = toValue() + override fun Array.toFieldValue(): Value = toValue() + + private fun setMetadata( + originalMetadata: RawMessageMetadata, + messageBuilder: Message.Builder, + messageType: String, + currentIndex: Int + ) { + messageBuilder.setMetadata( + MessageMetadata.newBuilder() + .setId( + MessageID.newBuilder(originalMetadata.id) + .setTimestamp(originalMetadata.id.timestamp) + .addSubsequence(currentIndex) + .build() + ) + .putAllProperties(originalMetadata.propertiesMap) + .setMessageType(messageType) + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt new file mode 100644 index 0000000..31413cb --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Message +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray +import com.exactpro.th2.common.utils.message.transport.logId +import java.nio.charset.Charset + +class TransportDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean, + trimWhitespace: Boolean +) : AbstractDecoder, RawMessage, ParsedMessage, Any>( + charset, + csvDelimiter, + defaultHeader, + publishHeader, + validateLength, + trimWhitespace, +) { + + override val RawMessage.messageMetadata: Map get() = this.metadata + override val RawMessage.messageSessionAlias: String get() = id.sessionAlias + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = protocol + override val RawMessage.logId: String get() = id.logId + override val RawMessage.logData: String get() = this.toString() + override val Message<*>.isParsed: Boolean get() = this is ParsedMessage + override val Message<*>.isRaw: Boolean get() = this is RawMessage + override val Message<*>.asRaw: RawMessage get() = this as RawMessage + + override fun createParsedMessage( + sourceMessage: RawMessage, + outputMessageType: String, + body: Map, + currentIndex: Int + ): Message<*> { + return ParsedMessage( + id = sourceMessage.id.toBuilder().addSubsequence(currentIndex).build(), + eventId = sourceMessage.eventId, + type = outputMessageType, + metadata = sourceMessage.metadata, + body = body + ) + } + + override fun String.toFieldValue(): String = this + override fun Array.toFieldValue(): Any = this +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index 317292c..c2bd251 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,26 +16,12 @@ package com.exactpro.th2.codec.csv; -import static org.junit.jupiter.api.Assertions.assertAll; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - import com.exactpro.th2.codec.DecodeException; +import com.exactpro.th2.codec.api.IReportingContext; +import com.exactpro.th2.codec.api.impl.ReportingContext; import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration; import com.exactpro.th2.common.grpc.AnyMessage; +import com.exactpro.th2.common.grpc.ConnectionID; import com.exactpro.th2.common.grpc.ListValue; import com.exactpro.th2.common.grpc.Message; import com.exactpro.th2.common.grpc.MessageGroup; @@ -45,203 +31,225 @@ import com.exactpro.th2.common.grpc.RawMessageMetadata; import com.exactpro.th2.common.grpc.Value; import com.google.protobuf.ByteString; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; class TestCsvCodec { + private final IReportingContext reportingContext = new ReportingContext(); + private static final String TEST_SESSION = "test-session"; + @Nested class TestPositive { @Test -void decodeArrayWithDifferentLength() throws IOException { + void decodeArrayWithDifferentLength() { CsvCodecConfiguration configuration = new CsvCodecConfiguration(); configuration.setValidateLength(false); configuration.setPublishHeader(true); - CsvCodec codec = createCodec(configuration); - MessageGroup group = MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B, , ,", "1,2,3,4")) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(2, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertFieldValueEquals(header, "Header", listValue("A", "B", "", "", "")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> { - var listValues = getListValue(message, "B", () -> "No field B. " + message); - assertEquals(3, listValues.length); - assertEquals("2", listValues[0]); - assertEquals("3", listValues[1]); - assertEquals("4", listValues[2]); - } - ) - ); + try(CsvCodec codec = createCodec(configuration)) { + MessageGroup group = MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B, , ,", "1,2,3,4")) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(2, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertFieldValueEquals(header, listValue("A", "B", "", "", "")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> { + var listValues = getListValue(message, "B", () -> "No field B. " + message); + assertEquals(3, listValues.length); + assertEquals("2", listValues[0]); + assertEquals("3", listValues[1]); + assertEquals("4", listValues[2]); + } + ) + ); + } } @Test - void decodeArrayInEnd() throws IOException { + void decodeArrayInEnd() { CsvCodecConfiguration configuration = new CsvCodecConfiguration(); configuration.setValidateLength(false); configuration.setPublishHeader(true); - CsvCodec codec = createCodec(configuration); - MessageGroup group = MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C ,", "1,2,3")) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(3, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertFieldValueEquals(header, "Header", listValue("A", "B", "C", "")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> { - var listValues = getListValue(message, "C", () -> "No field C. " + message); - assertEquals(1, listValues.length); - assertEquals("3", listValues[0]); - } - ) - ); + try(CsvCodec codec = createCodec(configuration)) { + MessageGroup group = MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C ,", "1,2,3")) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertFieldValueEquals(header, listValue("A", "B", "C", "")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> { + var listValues = getListValue(message, "C", () -> "No field C. " + message); + assertEquals(1, listValues.length); + assertEquals("3", listValues[0]); + } + ) + ); + } } @Test - void decodeArrayInMiddle() throws IOException { - CsvCodec codec = createCodec(); - MessageGroup group = MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B, ,C", "1,2,3,4")) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(3, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertFieldValueEquals(header, "Header", listValue("A", "B", "", "C")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> { - var listValues = getListValue(message, "B", () -> "No field B. " + message); - assertEquals(2, listValues.length); - assertEquals("2", listValues[0]); - assertEquals("3", listValues[1]); - }, - () -> assertEquals("4", getFieldValue(message, "C", () -> "No field C. " + message)) - ) - ); + void decodeArrayInMiddle() { + try(CsvCodec codec = createCodec()) { + MessageGroup group = MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B, ,C", "1,2,3,4")) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertFieldValueEquals(header, listValue("A", "B", "", "C")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> { + var listValues = getListValue(message, "B", () -> "No field B. " + message); + assertEquals(2, listValues.length); + assertEquals("2", listValues[0]); + assertEquals("3", listValues[1]); + }, + () -> assertEquals("4", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } } @Test void decodesDataAndSkipsHeader() { - CsvCodec codec = createCodec(); - MessageGroup group = MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C", "1,2,3")) - .build(); - - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(3, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertFieldValueEquals(header, "Header", listValue("A", "B", "C")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) - ) - ); + try(CsvCodec codec = createCodec()) { + MessageGroup group = MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C", "1,2,3")) + .build(); + + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertFieldValueEquals(header, listValue("A", "B", "C")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } } @Test void skipsHeaderPublishing() { final var config = new CsvCodecConfiguration(); config.setPublishHeader(false); - CsvCodec codec = createCodec(config); - MessageGroup group = MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C", "1,2,3")) - .build(); - - MessageGroup value = codec.decode(group); - assertEquals(1, value.getMessagesCount()); - - Message message = getMessage(value, 0); - assertFieldCount(3, message); - - assertAll( - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) - ) - ); + try(CsvCodec codec = createCodec(config)) { + MessageGroup group = MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C", "1,2,3")) + .build(); + + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(1, value.getMessagesCount()); + + Message message = getMessage(value, 0); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } } @Test @@ -250,213 +258,265 @@ void settingMessageTypeFromIncomingMessage() { final var config = new CsvCodecConfiguration(); config.setPublishHeader(false); - CsvCodec codec = createCodec(config); - final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", customType), "A,B,C", "1,2,3"); - - MessageGroup group = MessageGroup.newBuilder() - .addMessages(csvMessage) - .build(); - - MessageGroup value = codec.decode(group); - assertEquals(1, value.getMessagesCount()); - - Message message = getMessage(value, 0); - assertFieldCount(3, message); - - assertAll( - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals(customType, message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) - ) - ); + try(CsvCodec codec = createCodec(config)) { + final var csvMessage = createCsvMessage(Map.of("th2.csv.override_message_type", customType), "A,B,C", "1,2,3"); + + MessageGroup group = MessageGroup.newBuilder() + .addMessages(csvMessage) + .build(); + + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(1, value.getMessagesCount()); + + Message message = getMessage(value, 0); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals(customType, message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } } @Test void trimsEndOfTheLine() { - CsvCodec codec = createCodec(); - - MessageGroup group = MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C\n\r1,2,3\n")) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(3, message); - assertAll( - () -> assertAll("Current message: " + header, - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> assertFieldValueEquals(header, "Header", listValue("A", "B", "C")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) - ) - ); + try(CsvCodec codec = createCodec()) { + + MessageGroup group = MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C\n\r1,2,3\n")) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(3, message); + assertAll( + () -> assertAll("Current message: " + header, + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> assertFieldValueEquals(header, listValue("A", "B", "C")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } } @Test void decodesDataUsingDefaultHeader() { - CsvCodecConfiguration configuration = new CsvCodecConfiguration(); - configuration.setDefaultHeader(List.of("A", "B", "C")); - CsvCodec codec = createCodec(configuration); - - MessageGroup group = MessageGroup.newBuilder() - .addMessages( - createCsvMessage("1,2,3") - ) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(1, value.getMessagesCount()); - - Message message = getMessage(value, 0); - assertFieldCount(3, message); - assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) - ); + CsvCodecConfiguration config = new CsvCodecConfiguration(); + config.setDefaultHeader(List.of("A", "B", "C")); + try (CsvCodec codec = createCodec(config)) { + + MessageGroup group = MessageGroup.newBuilder() + .addMessages( + createCsvMessage("1,2,3") + ) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(1, value.getMessagesCount()); + + Message message = getMessage(value, 0); + assertFieldCount(3, message); + assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("2", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3", getFieldValue(message, "C", () -> "No field C. " + message)) + ); + } } @Test void decodesDataWithEscapedCharacters() { - CsvCodec codec = createCodec(); - - MessageGroup group = MessageGroup.newBuilder() - .addMessages( - createCsvMessage("A,B", "\"1,2\",\"\"\"value\"\"\"") - ) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(2, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> assertFieldValueEquals(header, "Header", listValue("A", "B")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("\"value\"", getFieldValue(message, "B", () -> "No field B. " + message)) - ) - ); + try(CsvCodec codec = createCodec()) { + + MessageGroup group = MessageGroup.newBuilder() + .addMessages( + createCsvMessage("A,B", "\"1,2\",\"\"\"value\"\"\"") + ) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(2, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> assertFieldValueEquals(header, listValue("A", "B")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("\"value\"", getFieldValue(message, "B", () -> "No field B. " + message)) + ) + ); + } } @Test void decodesDataCustomDelimiter() { - CsvCodecConfiguration configuration = new CsvCodecConfiguration(); - configuration.setDelimiter(';'); - configuration.setPublishHeader(true); - CsvCodec codec = createCodec(configuration); - - MessageGroup group = MessageGroup.newBuilder() - .addMessages( - createCsvMessage("A;B", "1,2;3") - ) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(2, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> assertFieldValueEquals(header, "Header", listValue("A", "B")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("3", getFieldValue(message, "B", () -> "No field B. " + message)) - ) - ); + CsvCodecConfiguration config = new CsvCodecConfiguration(); + config.setDelimiter(';'); + config.setPublishHeader(true); + try(CsvCodec codec = createCodec(config)) { + + MessageGroup group = MessageGroup.newBuilder() + .addMessages( + createCsvMessage("A;B", "1,2;3") + ) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(2, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> assertFieldValueEquals(header, listValue("A", "B")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1,2", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("3", getFieldValue(message, "B", () -> "No field B. " + message)) + ) + ); + } + } + + @Test + void trimsWhitespaces() { + final var config = new CsvCodecConfiguration(); + config.setPublishHeader(true); + config.setTrimWhitespace(true); + try(CsvCodec codec = createCodec(config)) { + + MessageGroup group = MessageGroup.newBuilder() + .addMessages( + createCsvMessage("A , B , C", "1 , , 3 3") + ) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> assertFieldValueEquals(header, listValue("A", "B", "C")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), + () -> assertEquals("", getFieldValue(message, "B", () -> "No field B. " + message)), + () -> assertEquals("3 3", getFieldValue(message, "C", () -> "No field C. " + message)) + ) + ); + } } @Test - void trimsWhitespacesDuringDecoding() { - CsvCodec codec = createCodec(); - - MessageGroup group = MessageGroup.newBuilder() - .addMessages( - createCsvMessage("A, B, C", "1, , 3 3") - ) - .build(); - MessageGroup value = codec.decode(group); - assertEquals(2, value.getMessagesCount()); - - Message header = getMessage(value, 0); - assertFieldCount(1, header); - Message message = getMessage(value, 1); - assertFieldCount(3, message); - - assertAll( - () -> assertAll("Current message: " + header, - () -> { - assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); - assertEquals(1, header.getMetadata().getId().getSubsequence(0)); - }, - () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), - () -> assertFieldValueEquals(header, "Header", listValue("A", "B", "C")) - ), - () -> assertAll("Current message: " + message, - () -> { - assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); - assertEquals(2, message.getMetadata().getId().getSubsequence(0)); - assertEquals("Csv_Message", message.getMetadata().getMessageType()); - }, - () -> assertEquals("1", getFieldValue(message, "A", () -> "No field A. " + message)), - () -> assertEquals("", getFieldValue(message, "B", () -> "No field B. " + message)), - () -> assertEquals("3 3", getFieldValue(message, "C", () -> "No field C. " + message)) - ) - ); + void notTrimsWhitespaces() { + final var config = new CsvCodecConfiguration(); + config.setPublishHeader(true); + config.setTrimWhitespace(false); + try(CsvCodec codec = createCodec(config)) { + + MessageGroup group = MessageGroup.newBuilder() + .addMessages( + createCsvMessage("A , B , C", "1 , , 3 3") + ) + .build(); + MessageGroup value = codec.decode(group, reportingContext); + assertEquals(2, value.getMessagesCount()); + + Message header = getMessage(value, 0); + assertFieldCount(1, header); + Message message = getMessage(value, 1); + assertFieldCount(3, message); + + assertAll( + () -> assertAll("Current message: " + header, + () -> { + assertEquals(1, header.getMetadata().getId().getSubsequenceCount()); + assertEquals(1, header.getMetadata().getId().getSubsequence(0)); + }, + () -> assertEquals("Csv_Header", header.getMetadata().getMessageType()), + () -> assertFieldValueEquals(header, listValue("A ", " B ", " C")) + ), + () -> assertAll("Current message: " + message, + () -> { + assertEquals(1, message.getMetadata().getId().getSubsequenceCount()); + assertEquals(2, message.getMetadata().getId().getSubsequence(0)); + assertEquals("Csv_Message", message.getMetadata().getMessageType()); + }, + () -> assertEquals("1 ", getFieldValue(message, "A ", () -> "No field A. " + message)), + () -> assertEquals(" ", getFieldValue(message, " B ", () -> "No field B. " + message)), + () -> assertEquals(" 3 3", getFieldValue(message, " C", () -> "No field C. " + message)) + ) + ); + } } } @@ -464,33 +524,40 @@ void trimsWhitespacesDuringDecoding() { class TestNegative { @Test void reportsErrorIfNotDataFound() { - CsvCodec codec = createCodec(); - Assertions.assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder().addMessages(createCsvMessage("")).build())); + try(CsvCodec codec = createCodec()) { + assertThrows(DecodeException.class, () -> + codec.decode(MessageGroup.newBuilder().addMessages(createCsvMessage("")).build(), reportingContext)); + } } @Test void reportsErrorIfRawDataIsEmpty() { - CsvCodec codec = createCodec(); - - Assertions.assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C")) - .addMessages(createCsvMessage("")) - .build()) - ); + try(CsvCodec codec = createCodec()) { + + assertThrows(DecodeException.class, () -> + codec.decode( + MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C")) + .addMessages(createCsvMessage("")) + .build(), + reportingContext) + ); + } } @Test void reportsErrorIfDefaultHeaderAndDataHaveDifferentSize() { - CsvCodecConfiguration configuration = new CsvCodecConfiguration(); - configuration.setDefaultHeader(List.of("A", "B")); - CsvCodec codec = createCodec(configuration); - - assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder() - .addMessages(createCsvMessage("1,2,3")).build()) - ); + CsvCodecConfiguration config = new CsvCodecConfiguration(); + config.setDefaultHeader(List.of("A", "B")); + try(CsvCodec codec = createCodec(config)) { + + assertThrows(DecodeException.class, () -> + codec.decode( + MessageGroup.newBuilder().addMessages(createCsvMessage("1,2,3")).build(), + reportingContext + ) + ); + } } } @@ -512,7 +579,10 @@ private AnyMessage createCsvMessage(Map metadataProps, String... Builder builder = RawMessage.newBuilder() .setBody(ByteString.copyFrom(String.join(StringUtils.LF, data).getBytes(StandardCharsets.UTF_8))); RawMessageMetadata.Builder metadataBuilder = RawMessageMetadata.newBuilder() - .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()) + .setId(MessageID.newBuilder() + .setSequence(System.nanoTime()) + .setConnectionId(ConnectionID.newBuilder().setSessionAlias(TEST_SESSION).build()) + .build()) .putAllProperties(metadataProps); builder.setMetadata(metadataBuilder.build()); return AnyMessage.newBuilder().setRawMessage(builder).build(); @@ -534,9 +604,9 @@ private String[] getListValue(Message message, String fieldName, Supplier "Unexpected value in " + fieldName + " field. Message: " + message); + private void assertFieldValueEquals(Message message, Value expectedValue) { + Value actualValue = message.getFieldsMap().get("Header"); + assertEquals(expectedValue, actualValue, () -> "Unexpected value in " + "Header" + " field. Message: " + message); } private static Value listValue(String... values) { diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt new file mode 100644 index 0000000..0ae5fcb --- /dev/null +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt @@ -0,0 +1,73 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.api.impl.ReportingContext +import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction +import io.netty.buffer.Unpooled +import org.apache.commons.lang3.StringUtils +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import java.lang.String.join +import java.nio.charset.StandardCharsets +import java.time.Instant + +class TestCsvCodecTransport { + private val reportingContext: IReportingContext = ReportingContext() + + @Test + fun decodesDataUsingDefaultHeader() { + val configuration = CsvCodecConfiguration() + configuration.defaultHeader = listOf("A", "B", "C") + val codec = CsvCodec(configuration) + val group = MessageGroup(listOf(createCsvMessage("1,2,3"))) + val decodedGroup = codec.decode(group,reportingContext) + + Assertions.assertEquals(1, decodedGroup.messages.size) + val message = decodedGroup.messages[0] as ParsedMessage + + message.body.size + + Assertions.assertEquals(3, message.body.size) + Assertions.assertEquals(1, message.id.subsequence.size) + Assertions.assertEquals(1, message.id.subsequence[0]) + Assertions.assertEquals("Csv_Message", message.type) + Assertions.assertEquals("1", message.body["A"]) + Assertions.assertEquals("2", message.body["B"]) + Assertions.assertEquals("3", message.body["C"]) + } + + private fun createCsvMessage(vararg data: String): RawMessage { + val body = join(StringUtils.LF, *data).toByteArray(StandardCharsets.UTF_8) + return RawMessage( + id = MessageId( + "alias_01", + Direction.INCOMING, + System.nanoTime(), + Instant.now() + ), + metadata = mapOf(), + body = Unpooled.wrappedBuffer(body) + ) + } +} \ No newline at end of file diff --git a/supressions.xml b/supressions.xml new file mode 100644 index 0000000..0c032ab --- /dev/null +++ b/supressions.xml @@ -0,0 +1,10 @@ + + + + + + + ^pkg:maven/com\.exactpro\.th2/grpc-.*@.*$ + cpe:/a:grpc:grpc + + \ No newline at end of file