diff --git a/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml new file mode 100644 index 0000000..9c3e5df --- /dev/null +++ b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml @@ -0,0 +1,22 @@ +name: Build and release Java distributions to sonatype. + +on: + push: + tags: + - \d+.\d+.\d+-dev + +jobs: + build: + uses: th2-net/.github/.github/workflows/compound-java.yml@main + with: + build-target: 'Sonatype,Docker' + runsOn: ubuntu-latest + gradleVersion: '7' + docker-username: ${{ github.actor }} + devRelease: true + secrets: + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + docker-password: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/README.md b/README.md index 320bf83..abf8014 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,18 @@ -# Csv codec (5.0.0) +# Csv codec (5.1.0) + ## Description -Designed for decode csv raw messages from csv reader to the parsed messages. -It is based on [th2-codec](https://github.com/th2-net/th2-codec). -You can find additional information [here](https://github.com/th2-net/th2-codec/blob/master/README.md) + +Designed for decode csv raw messages from csv reader to the parsed messages. It is based +on [th2-codec](https://github.com/th2-net/th2-codec). You can find additional +information [here](https://github.com/th2-net/th2-codec/blob/master/README.md) ## Decoding -The codec decodes each raw message in the received batch. -Each raw message might contain several line in CSV format. -If the default header parameter is not set the codec trites the first line from the raw message as a header. -Otherwise, the default header will be used for decoding the rest of data. -Output message type is taken from `th2.csv.override_message_type` property in input message. -If the property missing, the default value (`Csv_Message`) for output message type is used. +The codec decodes each raw message in the received batch. Each raw message might contain several line in CSV format. If +the default header parameter is not set the codec trites the first line from the raw message as a header. Otherwise, the +default header will be used for decoding the rest of data. Output message type is taken +from `th2.csv.override_message_type` property in input message. If the property missing, the default +value (`Csv_Message`) for output message type is used. If no data was decoded from raw message, the message will be skipped, and an error event will be reported. @@ -19,7 +20,7 @@ If no data was decoded from raw message, the message will be skipped, and an err ## Decode Example -Simple example: +Simple example: ```text A, B, V, G @@ -52,31 +53,37 @@ into { "A": 1, "B": 2, - "V": [3, 4, 5], + "V": [ + 3, + 4, + 5 + ], "G": 6, "D": 7 } ``` ## Settings + Csv codec has the following parameters: ```yaml -default-header: [A, B, C] +default-header: [ A, B, C ] delimiter: ',' encoding: UTF-8 display-name: CodecCsv validate-length: true publish-header: false ``` + **default-header** - the default header for this codec. It will be used if no header found in the received batch. **delimiter** - the delimiter to split values in received data. The default value is `,`. **encoding** - the encoding for the received data. The default value is `UTF-8`. -**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be attached to that root event. -The default value for the name is `CodecCsv`. +**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be +attached to that root event. The default value for the name is `CodecCsv`. **validate-length** - check if csv have different count of values against header's count. @@ -94,7 +101,7 @@ spec: image-version: 4.0.0 custom-config: codecSettings: - default-header: [A, B, C] + default-header: [ A, B, C ] delimiter: ',' encoding: UTF-8 pins: @@ -108,29 +115,37 @@ spec: # decoder - name: in_codec_decode connection-type: mq - attributes: ['decoder_in', 'raw', 'subscribe'] + attributes: [ 'decoder_in', 'raw', 'subscribe' ] - name: out_codec_decode connection-type: mq - attributes: ['decoder_out', 'parsed', 'publish'] + attributes: [ 'decoder_out', 'parsed', 'publish' ] # encoder general (technical) - name: in_codec_general_encode connection-type: mq - attributes: ['general_encoder_in', 'parsed', 'subscribe'] + attributes: [ 'general_encoder_in', 'parsed', 'subscribe' ] - name: out_codec_general_encode connection-type: mq - attributes: ['general_encoder_out', 'raw', 'publish'] + attributes: [ 'general_encoder_out', 'raw', 'publish' ] # decoder general (technical) - name: in_codec_general_decode connection-type: mq - attributes: ['general_decoder_in', 'raw', 'subscribe'] + attributes: [ 'general_decoder_in', 'raw', 'subscribe' ] - name: out_codec_general_decode connection-type: mq - attributes: ['general_decoder_out', 'parsed', 'publish'] + attributes: [ 'general_decoder_out', 'parsed', 'publish' ] ``` ## Release notes +### 5.1.0 + ++ Supports th2 transport protocol ++ Updated bom:4.4.0 ++ Updated common:5.3.0 ++ Updated codec:5.3.0 + ### 5.0.0 + + Migrated to books&pages concept ### 4.1.0 @@ -168,7 +183,8 @@ spec: + reads dictionaries from the /var/th2/config/dictionary folder. + uses mq_router, grpc_router, cradle_manager optional JSON configs from the /var/th2/config folder -+ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, default configuration ++ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, + default configuration + update Cradle version. Introduce async API for storing events + removed gRPC event loop handling + fixed dictionary reading diff --git a/build.gradle b/build.gradle index 99ede53..befcf09 100644 --- a/build.gradle +++ b/build.gradle @@ -1,109 +1,32 @@ -plugins { - id 'java-library' - id 'application' - id 'com.palantir.docker' version '0.25.0' - id "org.owasp.dependencycheck" version "8.1.2" -} - -dependencyCheck { - format='HTML' -} - -ext { - sharedDir = file("${project.rootDir}/shared") -} - -group = 'com.exactpro.th2' -version = release_version - -sourceCompatibility = 11 -targetCompatibility = 11 - -repositories { - maven { - name 'MavenLocal' - url sharedDir +buildscript { + repositories { + gradlePluginPortal() + maven { + url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + } } - mavenCentral() - - maven { - name 'Sonatype_snapshots' - url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - } - - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' - } - - mavenLocal() - - configurations.all { - resolutionStrategy.cacheChangingModulesFor 0, 'seconds' - resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' + dependencies { + classpath "com.exactpro.th2:th2-gradle-plugin:0.0.1-dev-5915772757-13a28ae-SNAPSHOT" } } -jar { - manifest { - attributes( - 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})", - 'Specification-Title': '', - 'Specification-Vendor': 'Exactpro Systems LLC', - 'Implementation-Title': project.archivesBaseName, - 'Implementation-Vendor': 'Exactpro Systems LLC', - 'Implementation-Vendor-Id': 'com.exactpro', - 'Implementation-Version': project.version - ) - } -} +apply plugin: "com.exactpro.th2.common-conventions" +apply plugin: "com.exactpro.th2.docker-conventions" -dependencies { - api platform('com.exactpro.th2:bom:4.2.0') - implementation 'com.exactpro.th2:common:5.2.0-dev' - implementation 'com.exactpro.th2:codec:5.2.0-dev' +apply plugin: 'kotlin-kapt' - implementation "org.slf4j:slf4j-api" - - implementation 'net.sourceforge.javacsv:javacsv:2.0' - implementation 'org.jetbrains:annotations:23.0.0' - - compileOnly 'com.google.auto.service:auto-service-annotations:1.0.1' - annotationProcessor 'com.google.auto.service:auto-service:1.0.1' - - testImplementation 'org.mockito:mockito-core:3.5.15' - testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2' -} - -test { - useJUnitPlatform() -} - -application { - mainClassName 'com.exactpro.th2.codec.MainKt' -} - -applicationName = 'service' - -distTar { - archiveFileName.set("${applicationName}.tar") -} - -dockerPrepare { - dependsOn distTar -} +dependencies { + api platform("com.exactpro.th2:bom:4.5.0") + implementation "com.exactpro.th2:common:5.4.0-dev" + implementation "com.exactpro.th2:codec:5.3.0-dev" + implementation "com.exactpro.th2:common-utils:2.2.0-dev" -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) -} + implementation "org.apache.commons:commons-lang3" + implementation "com.fasterxml.jackson.core:jackson-databind" -dependencyCheck { - formats=['SARIF', 'JSON', 'HTML'] - failBuildOnCVSS=5 + implementation "net.sourceforge.javacsv:javacsv:2.0" - analyzers { - assemblyEnabled = false - nugetconfEnabled = false - nodeEnabled = false - } + compileOnly "com.google.auto.service:auto-service-annotations:1.1.1" + annotationProcessor "com.google.auto.service:auto-service:1.1.1" + kapt "com.google.auto.service:auto-service:1.1.1" } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index c602a62..1f2e9e1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,2 @@ -release_version = 5.0.0 - -docker_image_name= +release_version=5.1.0 +app_main_class=com.exactpro.th2.codec.MainKt \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java index e0c860f..29fc44c 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java @@ -32,14 +32,7 @@ @AutoService(IPipelineCodecFactory.class) public class CodecFactory implements IPipelineCodecFactory { - public static final String PROTOCOL = "csv"; - private static final Set PROTOCOLS = Collections.singleton(PROTOCOL); - - @NotNull - @Override - public String getProtocol() { - return PROTOCOL; - } + private static final Set PROTOCOLS = Collections.singleton("csv"); @NotNull @Override @@ -73,4 +66,4 @@ public void init(@NotNull InputStream inputStream) { @Override public void close() { } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java deleted file mode 100644 index 08e72cc..0000000 --- a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.codec.csv; - -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.csvreader.CsvReader; -import com.exactpro.th2.codec.DecodeException; -import com.exactpro.th2.codec.api.IPipelineCodec; -import com.exactpro.th2.codec.api.IReportingContext; -import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration; -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.Message; -import com.exactpro.th2.common.grpc.Message.Builder; -import com.exactpro.th2.common.grpc.MessageGroup; -import com.exactpro.th2.common.grpc.MessageID; -import com.exactpro.th2.common.grpc.MessageMetadata; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageMetadata; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.value.ValueUtils; -import com.google.protobuf.ByteString; - -public class CsvCodec implements IPipelineCodec { - - private static final Logger LOGGER = LoggerFactory.getLogger(CsvCodec.class); - private static final String HEADER_MSG_TYPE = "Csv_Header"; - private static final String CSV_MESSAGE_TYPE = "Csv_Message"; - private static final String HEADER_FIELD_NAME = "Header"; - - private static final String OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type"; - - private final CsvCodecConfiguration configuration; - private final String[] defaultHeader; - private final Charset charset; - - public CsvCodec(CsvCodecConfiguration configuration) { - this.configuration = requireNonNull(configuration, "'Configuration' parameter"); - - List defaultHeader = configuration.getDefaultHeader(); - this.defaultHeader = defaultHeader == null - ? null - : defaultHeader.stream().map(String::trim).toArray(String[]::new); - if (this.defaultHeader != null && this.defaultHeader.length == 0) { - throw new IllegalArgumentException("Default header must not be empty"); - } - LOGGER.info("Default header: {}", configuration.getDefaultHeader()); - charset = Charset.forName(configuration.getEncoding()); - } - - @NotNull - @Override - public MessageGroup decode(@NotNull MessageGroup messageGroup) { - MessageGroup.Builder groupBuilder = MessageGroup.newBuilder(); - Collection errors = new ArrayList<>(); - for (AnyMessage anyMessage : messageGroup.getMessagesList()) { - if (anyMessage.hasMessage()) { - groupBuilder.addMessages(anyMessage); - continue; - } - if (!anyMessage.hasRawMessage()) { - LOGGER.error("Message should either have a raw or parsed message but has nothing: {}", anyMessage); - continue; - } - RawMessage rawMessage = anyMessage.getRawMessage(); - String protocol = rawMessage.getMetadata().getProtocol(); - if (!"".equals(protocol) && !"csv".equalsIgnoreCase(protocol)) { - LOGGER.error("Wrong protocol: message should have empty or 'csv' protocol but has {}", protocol); - continue; - } - - ByteString body = rawMessage.getBody(); - List data = decodeValues(body); - if (data.isEmpty()) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("The raw message does not contains any data: {}", MessageUtils.toJson(rawMessage)); - } - errors.add(new ErrorHolder("The raw message does not contains any data", rawMessage)); - continue; - } - decodeCsvData(errors, groupBuilder, rawMessage, data); - } - if (!errors.isEmpty()) { - throw createException(errors); - } - return groupBuilder.build(); - } - - @NotNull - @Override - public MessageGroup decode(@NotNull MessageGroup messageGroup, @NotNull IReportingContext iReportingContext) { - return decode(messageGroup); - } - - private RuntimeException createException(Collection errors) { - return new DecodeException( - "Cannot decode some messages: " + System.lineSeparator() + errors.stream() - .map(it -> "Message " + MessageUtils.toJson(it.originalMessage.getMetadata().getId()) + " cannot be decoded because " + it.text) - .collect(Collectors.joining(System.lineSeparator())) - ); - } - - @NotNull - @Override - public MessageGroup encode(@NotNull MessageGroup messageGroup) { - throw new UnsupportedOperationException("encode method is not implemented"); - } - - @NotNull - @Override - public MessageGroup encode(@NotNull MessageGroup messageGroup, @NotNull IReportingContext iReportingContext) { - throw new UnsupportedOperationException("encode method is not implemented"); - } - - @Override - public void close() { - } - - private void decodeCsvData(Collection errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable data) { - RawMessageMetadata originalMetadata = rawMessage.getMetadata(); - - final String outputMessageType = originalMetadata.getPropertiesOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE); - - int currentIndex = 0; - String[] header = defaultHeader; - for (String[] strings : data) { - currentIndex++; - - if (strings.length == 0) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Empty raw at {} index (starts with 1). Data: {}", currentIndex, data); - } - errors.add(new ErrorHolder("Empty raw at " + currentIndex + " index (starts with 1)", rawMessage)); - continue; - } - - trimEachElement(strings); - - if (header == null) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Set header to: " + Arrays.toString(strings)); - } - header = strings; - - if (configuration.isPublishHeader()) { - AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder(); - Builder headerMsg = Message.newBuilder(); - // Not set message type - setMetadata(originalMetadata, headerMsg, HEADER_MSG_TYPE, currentIndex); - headerMsg.putFields(HEADER_FIELD_NAME, ValueUtils.toValue(strings)); - messageBuilder.setMessage(headerMsg); - } - - continue; - } - - if (strings.length != header.length && configuration.getValidateLength()) { - String msg = String.format("Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", - header.length, strings.length, originalMetadata.getId().getConnectionId().getSessionAlias()); - LOGGER.error(msg); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(rawMessage.toString()); - } - errors.add(new ErrorHolder(msg, strings, rawMessage)); - } - - AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder(); - - Builder builder = Message.newBuilder(); - // Not set message type - setMetadata(originalMetadata, builder, outputMessageType, currentIndex); - - int headerLength = header.length; - int rowLength = strings.length; - for (int i = 0; i < headerLength && i < rowLength; ) { - int extraLength = getHeaderArrayLength(header, i); - if (extraLength == 1) { - builder.putFields(header[i], ValueUtils.toValue(strings[i])); - i++; - } else { - String[] values = copyArray(strings, i, i+extraLength); - builder.putFields(header[i], ValueUtils.toValue(values)); - i+=extraLength; - } - } - - messageBuilder.setMessage(builder); - } - } - - public static String [] copyArray(String [] original, int from, int to){ - String [] copyArr = new String[Integer.min(to, original.length) - from]; - for (int i = from; i < to && i < original.length; i++){ - copyArr[i-from] = original[i]; - } - return copyArr; - } - - private int getHeaderArrayLength(String[] header, int index) { - int length = 1; - for (int i = index + 1; i < header.length && header[i].isEmpty(); i++) { - length++; - } - return length; - } - - private void setMetadata(RawMessageMetadata originalMetadata, Message.Builder messageBuilder, String messageType, int currentIndex) { - messageBuilder.setMetadata(MessageMetadata - .newBuilder() - .setId(MessageID.newBuilder(originalMetadata.getId()) - .setTimestamp(originalMetadata.getId().getTimestamp()) - .addSubsequence(currentIndex) - .build()) - - .putAllProperties(originalMetadata.getPropertiesMap()) - .setMessageType(messageType) - ); - } - - private List decodeValues(ByteString body) { - try (InputStream in = new ByteArrayInputStream(body.toByteArray())) { - CsvReader reader = new CsvReader(in, configuration.getDelimiter(), charset); - try { - List result = new ArrayList<>(); - while (reader.readRecord()) { - result.add(reader.getValues()); - } - return result; - } finally { - reader.close(); - } - } catch (IOException e) { - throw new RuntimeException("cannot read data from raw bytes", e); - } - } - - private static class ErrorHolder { - public final String text; - public final String[] currentRow; - public final RawMessage originalMessage; - - private ErrorHolder(String text, String[] currentRow, RawMessage originalMessage) { - this.text = text; - this.currentRow = currentRow; - this.originalMessage = originalMessage; - } - - private ErrorHolder(String text, RawMessage originalMessage) { - this(text, null, originalMessage); - } - } - - private void trimEachElement(String[] elements) { - for (int i = 0; i < elements.length; i++) { - String element = elements[i]; - elements[i] = element.trim(); - } - } -} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt new file mode 100644 index 0000000..6650354 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/AbstractDecoder.kt @@ -0,0 +1,199 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.csvreader.CsvReader +import com.exactpro.th2.codec.DecodeException +import mu.KotlinLogging +import java.io.ByteArrayInputStream +import java.io.IOException +import java.nio.charset.Charset +import kotlin.math.min + +abstract class AbstractDecoder( + private val charset: Charset, + private val csvDelimiter: Char, + private val defaultHeader: Array?, + private val publishHeader: Boolean, + private val validateLength: Boolean +) { + protected abstract val RAW_MESSAGE.messageMetadata: Map + protected abstract val RAW_MESSAGE.messageSessionAlias: String + protected abstract val RAW_MESSAGE.rawBody: ByteArray + protected abstract val RAW_MESSAGE.messageProtocol: String + protected abstract val RAW_MESSAGE.logId: String + protected abstract val RAW_MESSAGE.logData: String + protected abstract val ANY_MESSAGE.isParsed: Boolean + protected abstract val ANY_MESSAGE.isRaw: Boolean + protected abstract val ANY_MESSAGE.asRaw: RAW_MESSAGE + + protected abstract fun createParsedMessage(sourceMessage: RAW_MESSAGE, outputMessageType: String, body: Map, currentIndex: Int): ANY_MESSAGE + protected abstract fun String.toFieldValue(): BODY_FIELD_VALUE + protected abstract fun Array.toFieldValue(): BODY_FIELD_VALUE + + fun decode(messageGroup: List): List { + val groupBuilder = mutableListOf() // ProtoMessageGroup.newBuilder() + val errors: MutableCollection> = mutableListOf() + for (anyMessage in messageGroup) { + if (anyMessage.isParsed) { + groupBuilder += anyMessage + continue + } + if (!anyMessage.isRaw) { + LOGGER.error { "Message should either have a raw or parsed message but has nothing: $anyMessage" } + continue + } + val rawMessage = anyMessage.asRaw + val protocol = rawMessage.messageProtocol + if ("" != protocol && !"csv".equals(protocol, ignoreCase = true)) { + LOGGER.error { "Wrong protocol: message should have empty or 'csv' protocol but has $protocol" } + continue + } + val data = decodeValues(rawMessage.rawBody) + if (data.isEmpty()) { + LOGGER.error { "The raw message does not contains any data: ${rawMessage.logData}" } + errors.add(ErrorHolder("The raw message does not contains any data", rawMessage)) + continue + } + + decodeCsvData(errors, groupBuilder, rawMessage, data) + } + if (errors.isNotEmpty()) { + throw DecodeException( + "Cannot decode some messages:\n" + errors.joinToString("\n") { + "Message ${it.originalMessage.logId} cannot be decoded because ${it.text}" + } + ) + } + return groupBuilder + } + + private fun decodeCsvData( + errors: MutableCollection>, + groupBuilder: MutableList, + rawMessage: RAW_MESSAGE, + data: Iterable> + ) { + val originalMetadata = rawMessage.messageMetadata + val outputMessageType = originalMetadata.getOrDefault( + OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, + CSV_MESSAGE_TYPE + ) + var currentIndex = 0 + var header: Array? = defaultHeader + for (strings in data) { + currentIndex++ + if (strings.isEmpty()) { + LOGGER.error { "Empty raw at $currentIndex index (starts with 1). Data: $data" } + + errors.add(ErrorHolder("Empty raw at $currentIndex index (starts with 1)", rawMessage)) + continue + } + trimEachElement(strings) + if (header == null) { + LOGGER.debug { "Set header to: ${strings.contentToString()}" } + header = strings + if (publishHeader) { + //groupBuilder += createHeadersMessage(rawMessage, strings, currentIndex) + groupBuilder += createParsedMessage(rawMessage, + HEADER_MSG_TYPE, mapOf(HEADER_FIELD_NAME to strings.toFieldValue()), currentIndex) + } + continue + } + if (strings.size != header.size && validateLength) { + val msg = String.format( + "Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", + header.size, strings.size, rawMessage.messageSessionAlias + ) + LOGGER.error(msg) + LOGGER.debug { rawMessage.toString() } + errors.add(ErrorHolder(msg, rawMessage, strings)) + } + + val headerLength = header.size + val rowLength = strings.size + var i = 0 + val body = mutableMapOf() + while (i < headerLength && i < rowLength) { + val extraLength = getHeaderArrayLength(header, i) + if (extraLength == 1) { + body[header[i]] = strings[i].toFieldValue() + i++ + } else { + val values = copyArray(strings, i, i + extraLength) + body[header[i]] = values.toFieldValue() + i += extraLength + } + } + + groupBuilder += createParsedMessage(rawMessage, outputMessageType, body, currentIndex) + } + } + + private fun copyArray(original: Array, from: Int, to: Int) = original.copyOfRange(from, + min(to, original.size) + ) + + private fun getHeaderArrayLength(header: Array, index: Int): Int { + var length = 1 + var i = index + 1 + while (i < header.size && header[i].isEmpty()) { + length++ + i++ + } + return length + } + + private fun decodeValues(body: ByteArray): List> { + try { + ByteArrayInputStream(body).use { + val reader = CsvReader(it, csvDelimiter, charset) + return try { + val result: MutableList> = ArrayList() + while (reader.readRecord()) { + result.add(reader.values) + } + result + } finally { + reader.close() + } + } + } catch (e: IOException) { + throw RuntimeException("cannot read data from raw bytes", e) + } + } + + private class ErrorHolder( + val text: String, + val originalMessage: T, + val currentRow: Array = emptyArray() + ) + + private fun trimEachElement(elements: Array) { + for (i in elements.indices) { + elements[i] = elements[i].trim() + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + private const val HEADER_MSG_TYPE = "Csv_Header" + private const val CSV_MESSAGE_TYPE = "Csv_Message" + private const val HEADER_FIELD_NAME = "Header" + private const val OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt new file mode 100644 index 0000000..6b5a48c --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.codec.api.IPipelineCodec +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import mu.KotlinLogging +import java.nio.charset.Charset + +class CsvCodec(private val config: CsvCodecConfiguration) : IPipelineCodec { + + private val charset: Charset = Charset.forName(config.encoding) + private val defaultHeader = config.defaultHeader?.asSequence()?.map(String::trim)?.toList()?.toTypedArray()?.apply { + require(isNotEmpty()) { "Default header must not be empty" } + LOGGER.info { "Default header: ${config.defaultHeader}" } + } + + private val protoDecoder = ProtoDecoder(charset, config.delimiter, defaultHeader, config.isPublishHeader, config.validateLength) + private val transportDecoder = TransportDecoder(charset, config.delimiter, defaultHeader, config.isPublishHeader, config.validateLength) + + override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { + val decodedMessages = protoDecoder.decode(messageGroup.messagesList) + return ProtoMessageGroup.newBuilder().addAllMessages(decodedMessages).build() + } + + override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { + val decodedMessages = transportDecoder.decode(messageGroup.messages) + return MessageGroup(decodedMessages) + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt new file mode 100644 index 0000000..1588289 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/ProtoDecoder.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.common.grpc.Message +import com.exactpro.th2.common.grpc.AnyMessage +import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.grpc.MessageMetadata +import com.exactpro.th2.common.grpc.RawMessageMetadata +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.common.grpc.Value +import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId +import com.exactpro.th2.common.utils.message.sessionAlias +import com.exactpro.th2.common.value.toValue +import java.nio.charset.Charset + +class ProtoDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean +) : AbstractDecoder(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { + + override val RawMessage.messageMetadata: Map get() = metadata.propertiesMap + override val RawMessage.messageSessionAlias: String get() = sessionAlias ?: error("No session alias in message") + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = metadata.protocol + override val RawMessage.logId: String get() = this.id.logId + override val RawMessage.logData: String get() = toJson() + override val AnyMessage.isParsed: Boolean get() = hasMessage() + override val AnyMessage.isRaw: Boolean get() = hasRawMessage() + override val AnyMessage.asRaw: RawMessage get() = rawMessage + + override fun createParsedMessage(sourceMessage: RawMessage, outputMessageType: String, body: Map, currentIndex: Int): AnyMessage { + val builder = Message.newBuilder() + .putAllFields(body) + .setParentEventId(sourceMessage.parentEventId) + + // Not set message type + setMetadata(sourceMessage.metadata, builder, outputMessageType, currentIndex) + return AnyMessage.newBuilder().setMessage(builder).build() + } + + override fun String.toFieldValue(): Value = toValue() + override fun Array.toFieldValue(): Value = toValue() + + private fun setMetadata( + originalMetadata: RawMessageMetadata, + messageBuilder: Message.Builder, + messageType: String, + currentIndex: Int + ) { + messageBuilder.setMetadata( + MessageMetadata.newBuilder() + .setId( + MessageID.newBuilder(originalMetadata.id) + .setTimestamp(originalMetadata.id.timestamp) + .addSubsequence(currentIndex) + .build() + ) + .putAllProperties(originalMetadata.propertiesMap) + .setMessageType(messageType) + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt new file mode 100644 index 0000000..b3323cf --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/TransportDecoder.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Message +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray +import com.exactpro.th2.common.utils.message.transport.logId +import java.nio.charset.Charset + +class TransportDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean +) : AbstractDecoder, RawMessage, ParsedMessage, Any>(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { + + override val RawMessage.messageMetadata: Map get() = this.metadata + override val RawMessage.messageSessionAlias: String get() = id.sessionAlias + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = protocol + override val RawMessage.logId: String get() = id.logId + override val RawMessage.logData: String get() = this.toString() + override val Message<*>.isParsed: Boolean get() = this is ParsedMessage + override val Message<*>.isRaw: Boolean get() = this is RawMessage + override val Message<*>.asRaw: RawMessage get() = this as RawMessage + + override fun createParsedMessage( + sourceMessage: RawMessage, + outputMessageType: String, + body: Map, + currentIndex: Int + ): Message<*> { + return ParsedMessage( + id = sourceMessage.id.toBuilder().addSubsequence(currentIndex).build(), + eventId = sourceMessage.eventId, + type = outputMessageType, + metadata = sourceMessage.metadata, + body = body + ) + } + + override fun String.toFieldValue(): String = this + override fun Array.toFieldValue(): Any = this +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index 7ec9080..dc56098 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,10 @@ import java.util.Map; import java.util.function.Supplier; +import com.exactpro.th2.codec.api.IReportingContext; +import com.exactpro.th2.codec.api.impl.ReportingContext; +import com.exactpro.th2.common.grpc.ConnectionID; import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -47,11 +49,14 @@ import com.google.protobuf.ByteString; class TestCsvCodec { + private final IReportingContext reportingContext = new ReportingContext(); + private static final String TEST_SESSION = "test-session"; + @Nested class TestPositive { @Test -void decodeArrayWithDifferentLength() throws IOException { + void decodeArrayWithDifferentLength() throws IOException { CsvCodecConfiguration configuration = new CsvCodecConfiguration(); configuration.setValidateLength(false); configuration.setPublishHeader(true); @@ -59,7 +64,7 @@ void decodeArrayWithDifferentLength() throws IOException { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B, , ,", "1,2,3,4")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -103,7 +108,7 @@ void decodeArrayInEnd() throws IOException { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B,C ,", "1,2,3")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -143,7 +148,7 @@ void decodeArrayInMiddle() throws IOException { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B, ,C", "1,2,3,4")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -185,7 +190,7 @@ void decodesDataAndSkipsHeader() { .addMessages(createCsvMessage("A,B,C", "1,2,3")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -224,7 +229,7 @@ void skipsHeaderPublishing() { .addMessages(createCsvMessage("A,B,C", "1,2,3")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(1, value.getMessagesCount()); Message message = getMessage(value, 0); @@ -257,7 +262,7 @@ void settingMessageTypeFromIncomingMessage() { .addMessages(csvMessage) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(1, value.getMessagesCount()); Message message = getMessage(value, 0); @@ -284,7 +289,7 @@ void trimsEndOfTheLine() { MessageGroup group = MessageGroup.newBuilder() .addMessages(createCsvMessage("A,B,C\n\r1,2,3\n")) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -324,7 +329,7 @@ void decodesDataUsingDefaultHeader() { createCsvMessage("1,2,3") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(1, value.getMessagesCount()); Message message = getMessage(value, 0); @@ -350,7 +355,7 @@ void decodesDataWithEscapedCharacters() { createCsvMessage("A,B", "\"1,2\",\"\"\"value\"\"\"") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -391,7 +396,7 @@ void decodesDataCustomDelimiter() { createCsvMessage("A;B", "1,2;3") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -429,7 +434,7 @@ void trimsWhitespacesDuringDecoding() { createCsvMessage("A, B, C", "1, , 3 3") ) .build(); - MessageGroup value = codec.decode(group); + MessageGroup value = codec.decode(group, reportingContext); assertEquals(2, value.getMessagesCount()); Message header = getMessage(value, 0); @@ -465,19 +470,35 @@ class TestNegative { @Test void reportsErrorIfNotDataFound() { CsvCodec codec = createCodec(); - Assertions.assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder().addMessages(createCsvMessage("")).build())); + assertThrows(DecodeException.class, () -> + codec.decode(MessageGroup.newBuilder().addMessages(createCsvMessage("")).build(), reportingContext)); } @Test void reportsErrorIfRawDataIsEmpty() { CsvCodec codec = createCodec(); - Assertions.assertThrows(DecodeException.class, () -> - codec.decode(MessageGroup.newBuilder() - .addMessages(createCsvMessage("A,B,C")) - .addMessages(createCsvMessage("")) - .build()) + assertThrows(DecodeException.class, () -> + codec.decode( + MessageGroup.newBuilder() + .addMessages(createCsvMessage("A,B,C")) + .addMessages(createCsvMessage("")) + .build(), + reportingContext) + ); + } + + @Test + void reportsErrorIfDefaultHeaderAndDataHaveDifferentSize() { + CsvCodecConfiguration configuration = new CsvCodecConfiguration(); + configuration.setDefaultHeader(List.of("A", "B")); + CsvCodec codec = createCodec(configuration); + + assertThrows(DecodeException.class, () -> + codec.decode( + MessageGroup.newBuilder().addMessages(createCsvMessage("1,2,3")).build(), + reportingContext + ) ); } } @@ -500,7 +521,10 @@ private AnyMessage createCsvMessage(Map metadataProps, String... Builder builder = RawMessage.newBuilder() .setBody(ByteString.copyFrom(String.join(StringUtils.LF, data).getBytes(StandardCharsets.UTF_8))); RawMessageMetadata.Builder metadataBuilder = RawMessageMetadata.newBuilder() - .setId(MessageID.newBuilder().setSequence(System.nanoTime()).build()) + .setId(MessageID.newBuilder() + .setSequence(System.nanoTime()) + .setConnectionId(ConnectionID.newBuilder().setSessionAlias(TEST_SESSION).build()) + .build()) .putAllProperties(metadataProps); builder.setMetadata(metadataBuilder.build()); return AnyMessage.newBuilder().setRawMessage(builder).build(); diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt new file mode 100644 index 0000000..d79ed18 --- /dev/null +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt @@ -0,0 +1,76 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.api.impl.ReportingContext +import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction +import io.netty.buffer.Unpooled +import org.apache.commons.lang3.StringUtils +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import java.nio.charset.StandardCharsets +import java.time.Instant + +class TestCsvCodecTransport { + private val reportingContext: IReportingContext = ReportingContext() + + @Test + fun decodesDataUsingDefaultHeader() { + val configuration = CsvCodecConfiguration() + configuration.defaultHeader = listOf("A", "B", "C") + val codec = CsvCodec(configuration) + val group = MessageGroup(listOf(createCsvMessage("1,2,3"))) + val decodedGroup = codec.decode(group,reportingContext) + + Assertions.assertEquals(1, decodedGroup.messages.size) + val message = decodedGroup.messages[0] as ParsedMessage + + message.body.size + + Assertions.assertEquals(3, message.body.size) + Assertions.assertEquals(1, message.id.subsequence.size) + Assertions.assertEquals(1, message.id.subsequence[0]) + Assertions.assertEquals("Csv_Message", message.type) + Assertions.assertEquals("1", message.body["A"]) + Assertions.assertEquals("2", message.body["B"]) + Assertions.assertEquals("3", message.body["C"]) + } + + private fun createCsvMessage(vararg data: String): RawMessage { + return createCsvMessage(java.util.Map.of(), *data) + } + + private fun createCsvMessage(metadataProps: Map = mapOf(), vararg data: String): RawMessage { + val body = java.lang.String.join(StringUtils.LF, *data).toByteArray(StandardCharsets.UTF_8) + return RawMessage( + id = MessageId( + "alias_01", + Direction.INCOMING, + System.nanoTime(), + Instant.now() + ), + metadata = metadataProps, + body = Unpooled.wrappedBuffer(body) + ) + } +} \ No newline at end of file