diff --git a/README.md b/README.md index 320bf83..abf8014 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,18 @@ -# Csv codec (5.0.0) +# Csv codec (5.1.0) + ## Description -Designed for decode csv raw messages from csv reader to the parsed messages. -It is based on [th2-codec](https://github.com/th2-net/th2-codec). -You can find additional information [here](https://github.com/th2-net/th2-codec/blob/master/README.md) + +Designed for decode csv raw messages from csv reader to the parsed messages. It is based +on [th2-codec](https://github.com/th2-net/th2-codec). You can find additional +information [here](https://github.com/th2-net/th2-codec/blob/master/README.md) ## Decoding -The codec decodes each raw message in the received batch. -Each raw message might contain several line in CSV format. -If the default header parameter is not set the codec trites the first line from the raw message as a header. -Otherwise, the default header will be used for decoding the rest of data. -Output message type is taken from `th2.csv.override_message_type` property in input message. -If the property missing, the default value (`Csv_Message`) for output message type is used. +The codec decodes each raw message in the received batch. Each raw message might contain several line in CSV format. If +the default header parameter is not set the codec trites the first line from the raw message as a header. Otherwise, the +default header will be used for decoding the rest of data. Output message type is taken +from `th2.csv.override_message_type` property in input message. If the property missing, the default +value (`Csv_Message`) for output message type is used. If no data was decoded from raw message, the message will be skipped, and an error event will be reported. @@ -19,7 +20,7 @@ If no data was decoded from raw message, the message will be skipped, and an err ## Decode Example -Simple example: +Simple example: ```text A, B, V, G @@ -52,31 +53,37 @@ into { "A": 1, "B": 2, - "V": [3, 4, 5], + "V": [ + 3, + 4, + 5 + ], "G": 6, "D": 7 } ``` ## Settings + Csv codec has the following parameters: ```yaml -default-header: [A, B, C] +default-header: [ A, B, C ] delimiter: ',' encoding: UTF-8 display-name: CodecCsv validate-length: true publish-header: false ``` + **default-header** - the default header for this codec. It will be used if no header found in the received batch. **delimiter** - the delimiter to split values in received data. The default value is `,`. **encoding** - the encoding for the received data. The default value is `UTF-8`. -**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be attached to that root event. -The default value for the name is `CodecCsv`. +**display-name** - the name to set in the root event sent to the event store. All errors during decoding will be +attached to that root event. The default value for the name is `CodecCsv`. **validate-length** - check if csv have different count of values against header's count. @@ -94,7 +101,7 @@ spec: image-version: 4.0.0 custom-config: codecSettings: - default-header: [A, B, C] + default-header: [ A, B, C ] delimiter: ',' encoding: UTF-8 pins: @@ -108,29 +115,37 @@ spec: # decoder - name: in_codec_decode connection-type: mq - attributes: ['decoder_in', 'raw', 'subscribe'] + attributes: [ 'decoder_in', 'raw', 'subscribe' ] - name: out_codec_decode connection-type: mq - attributes: ['decoder_out', 'parsed', 'publish'] + attributes: [ 'decoder_out', 'parsed', 'publish' ] # encoder general (technical) - name: in_codec_general_encode connection-type: mq - attributes: ['general_encoder_in', 'parsed', 'subscribe'] + attributes: [ 'general_encoder_in', 'parsed', 'subscribe' ] - name: out_codec_general_encode connection-type: mq - attributes: ['general_encoder_out', 'raw', 'publish'] + attributes: [ 'general_encoder_out', 'raw', 'publish' ] # decoder general (technical) - name: in_codec_general_decode connection-type: mq - attributes: ['general_decoder_in', 'raw', 'subscribe'] + attributes: [ 'general_decoder_in', 'raw', 'subscribe' ] - name: out_codec_general_decode connection-type: mq - attributes: ['general_decoder_out', 'parsed', 'publish'] + attributes: [ 'general_decoder_out', 'parsed', 'publish' ] ``` ## Release notes +### 5.1.0 + ++ Supports th2 transport protocol ++ Updated bom:4.4.0 ++ Updated common:5.3.0 ++ Updated codec:5.3.0 + ### 5.0.0 + + Migrated to books&pages concept ### 4.1.0 @@ -168,7 +183,8 @@ spec: + reads dictionaries from the /var/th2/config/dictionary folder. + uses mq_router, grpc_router, cradle_manager optional JSON configs from the /var/th2/config folder -+ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, default configuration ++ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, + default configuration + update Cradle version. Introduce async API for storing events + removed gRPC event loop handling + fixed dictionary reading diff --git a/build.gradle b/build.gradle index 99ede53..362718a 100644 --- a/build.gradle +++ b/build.gradle @@ -1,109 +1,38 @@ -plugins { - id 'java-library' - id 'application' - id 'com.palantir.docker' version '0.25.0' - id "org.owasp.dependencycheck" version "8.1.2" -} - -dependencyCheck { - format='HTML' -} - -ext { - sharedDir = file("${project.rootDir}/shared") -} - -group = 'com.exactpro.th2' -version = release_version - -sourceCompatibility = 11 -targetCompatibility = 11 - -repositories { - maven { - name 'MavenLocal' - url sharedDir - } - mavenCentral() - - maven { - name 'Sonatype_snapshots' - url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - } - - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' - } - - mavenLocal() - - configurations.all { - resolutionStrategy.cacheChangingModulesFor 0, 'seconds' - resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' +buildscript { + repositories { + gradlePluginPortal() + maven { + url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + } } -} - -jar { - manifest { - attributes( - 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})", - 'Specification-Title': '', - 'Specification-Vendor': 'Exactpro Systems LLC', - 'Implementation-Title': project.archivesBaseName, - 'Implementation-Vendor': 'Exactpro Systems LLC', - 'Implementation-Vendor-Id': 'com.exactpro', - 'Implementation-Version': project.version - ) + dependencies { + classpath "com.exactpro.th2:th2-gradle-plugin:0.0.1-dev-5915772757-13a28ae-SNAPSHOT" } } -dependencies { - api platform('com.exactpro.th2:bom:4.2.0') - implementation 'com.exactpro.th2:common:5.2.0-dev' - implementation 'com.exactpro.th2:codec:5.2.0-dev' - - implementation "org.slf4j:slf4j-api" +apply plugin: "com.exactpro.th2.common-conventions" +apply plugin: "com.exactpro.th2.docker-conventions" - implementation 'net.sourceforge.javacsv:javacsv:2.0' - implementation 'org.jetbrains:annotations:23.0.0' +apply plugin: 'kotlin-kapt' - compileOnly 'com.google.auto.service:auto-service-annotations:1.0.1' - annotationProcessor 'com.google.auto.service:auto-service:1.0.1' - - testImplementation 'org.mockito:mockito-core:3.5.15' - testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2' -} - -test { - useJUnitPlatform() -} - -application { - mainClassName 'com.exactpro.th2.codec.MainKt' -} +dependencies { + api platform("com.exactpro.th2:bom:4.5.0") + implementation "com.exactpro.th2:common:5.4.0-dev" + implementation "com.exactpro.th2:codec:5.3.0-new-proto-+" + implementation "com.exactpro.th2:common-utils:2.2.0-dev" -applicationName = 'service' + implementation "org.apache.commons:commons-lang3" + implementation "com.fasterxml.jackson.core:jackson-databind" -distTar { - archiveFileName.set("${applicationName}.tar") -} + implementation "net.sourceforge.javacsv:javacsv:2.0" -dockerPrepare { - dependsOn distTar + compileOnly "com.google.auto.service:auto-service-annotations:1.1.1" + annotationProcessor "com.google.auto.service:auto-service:1.1.1" + kapt "com.google.auto.service:auto-service:1.1.1" } -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) -} - -dependencyCheck { - formats=['SARIF', 'JSON', 'HTML'] - failBuildOnCVSS=5 - - analyzers { - assemblyEnabled = false - nugetconfEnabled = false - nodeEnabled = false +configurations { + compileClasspath { + resolutionStrategy.activateDependencyLocking() } } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index c602a62..f40f5e8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version = 5.0.0 - -docker_image_name= +release_version=5.1.0 +app_main_class=com.exactpro.th2.codec.MainKt +docker_image_name= \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java index e0c860f..477b162 100644 --- a/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java +++ b/src/main/java/com/exactpro/th2/codec/csv/CodecFactory.java @@ -73,4 +73,4 @@ public void init(@NotNull InputStream inputStream) { @Override public void close() { } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java b/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java deleted file mode 100644 index 08e72cc..0000000 --- a/src/main/java/com/exactpro/th2/codec/csv/CsvCodec.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.codec.csv; - -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.csvreader.CsvReader; -import com.exactpro.th2.codec.DecodeException; -import com.exactpro.th2.codec.api.IPipelineCodec; -import com.exactpro.th2.codec.api.IReportingContext; -import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration; -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.Message; -import com.exactpro.th2.common.grpc.Message.Builder; -import com.exactpro.th2.common.grpc.MessageGroup; -import com.exactpro.th2.common.grpc.MessageID; -import com.exactpro.th2.common.grpc.MessageMetadata; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageMetadata; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.value.ValueUtils; -import com.google.protobuf.ByteString; - -public class CsvCodec implements IPipelineCodec { - - private static final Logger LOGGER = LoggerFactory.getLogger(CsvCodec.class); - private static final String HEADER_MSG_TYPE = "Csv_Header"; - private static final String CSV_MESSAGE_TYPE = "Csv_Message"; - private static final String HEADER_FIELD_NAME = "Header"; - - private static final String OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type"; - - private final CsvCodecConfiguration configuration; - private final String[] defaultHeader; - private final Charset charset; - - public CsvCodec(CsvCodecConfiguration configuration) { - this.configuration = requireNonNull(configuration, "'Configuration' parameter"); - - List defaultHeader = configuration.getDefaultHeader(); - this.defaultHeader = defaultHeader == null - ? null - : defaultHeader.stream().map(String::trim).toArray(String[]::new); - if (this.defaultHeader != null && this.defaultHeader.length == 0) { - throw new IllegalArgumentException("Default header must not be empty"); - } - LOGGER.info("Default header: {}", configuration.getDefaultHeader()); - charset = Charset.forName(configuration.getEncoding()); - } - - @NotNull - @Override - public MessageGroup decode(@NotNull MessageGroup messageGroup) { - MessageGroup.Builder groupBuilder = MessageGroup.newBuilder(); - Collection errors = new ArrayList<>(); - for (AnyMessage anyMessage : messageGroup.getMessagesList()) { - if (anyMessage.hasMessage()) { - groupBuilder.addMessages(anyMessage); - continue; - } - if (!anyMessage.hasRawMessage()) { - LOGGER.error("Message should either have a raw or parsed message but has nothing: {}", anyMessage); - continue; - } - RawMessage rawMessage = anyMessage.getRawMessage(); - String protocol = rawMessage.getMetadata().getProtocol(); - if (!"".equals(protocol) && !"csv".equalsIgnoreCase(protocol)) { - LOGGER.error("Wrong protocol: message should have empty or 'csv' protocol but has {}", protocol); - continue; - } - - ByteString body = rawMessage.getBody(); - List data = decodeValues(body); - if (data.isEmpty()) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("The raw message does not contains any data: {}", MessageUtils.toJson(rawMessage)); - } - errors.add(new ErrorHolder("The raw message does not contains any data", rawMessage)); - continue; - } - decodeCsvData(errors, groupBuilder, rawMessage, data); - } - if (!errors.isEmpty()) { - throw createException(errors); - } - return groupBuilder.build(); - } - - @NotNull - @Override - public MessageGroup decode(@NotNull MessageGroup messageGroup, @NotNull IReportingContext iReportingContext) { - return decode(messageGroup); - } - - private RuntimeException createException(Collection errors) { - return new DecodeException( - "Cannot decode some messages: " + System.lineSeparator() + errors.stream() - .map(it -> "Message " + MessageUtils.toJson(it.originalMessage.getMetadata().getId()) + " cannot be decoded because " + it.text) - .collect(Collectors.joining(System.lineSeparator())) - ); - } - - @NotNull - @Override - public MessageGroup encode(@NotNull MessageGroup messageGroup) { - throw new UnsupportedOperationException("encode method is not implemented"); - } - - @NotNull - @Override - public MessageGroup encode(@NotNull MessageGroup messageGroup, @NotNull IReportingContext iReportingContext) { - throw new UnsupportedOperationException("encode method is not implemented"); - } - - @Override - public void close() { - } - - private void decodeCsvData(Collection errors, MessageGroup.Builder groupBuilder, RawMessage rawMessage, Iterable data) { - RawMessageMetadata originalMetadata = rawMessage.getMetadata(); - - final String outputMessageType = originalMetadata.getPropertiesOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE); - - int currentIndex = 0; - String[] header = defaultHeader; - for (String[] strings : data) { - currentIndex++; - - if (strings.length == 0) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Empty raw at {} index (starts with 1). Data: {}", currentIndex, data); - } - errors.add(new ErrorHolder("Empty raw at " + currentIndex + " index (starts with 1)", rawMessage)); - continue; - } - - trimEachElement(strings); - - if (header == null) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Set header to: " + Arrays.toString(strings)); - } - header = strings; - - if (configuration.isPublishHeader()) { - AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder(); - Builder headerMsg = Message.newBuilder(); - // Not set message type - setMetadata(originalMetadata, headerMsg, HEADER_MSG_TYPE, currentIndex); - headerMsg.putFields(HEADER_FIELD_NAME, ValueUtils.toValue(strings)); - messageBuilder.setMessage(headerMsg); - } - - continue; - } - - if (strings.length != header.length && configuration.getValidateLength()) { - String msg = String.format("Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", - header.length, strings.length, originalMetadata.getId().getConnectionId().getSessionAlias()); - LOGGER.error(msg); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(rawMessage.toString()); - } - errors.add(new ErrorHolder(msg, strings, rawMessage)); - } - - AnyMessage.Builder messageBuilder = groupBuilder.addMessagesBuilder(); - - Builder builder = Message.newBuilder(); - // Not set message type - setMetadata(originalMetadata, builder, outputMessageType, currentIndex); - - int headerLength = header.length; - int rowLength = strings.length; - for (int i = 0; i < headerLength && i < rowLength; ) { - int extraLength = getHeaderArrayLength(header, i); - if (extraLength == 1) { - builder.putFields(header[i], ValueUtils.toValue(strings[i])); - i++; - } else { - String[] values = copyArray(strings, i, i+extraLength); - builder.putFields(header[i], ValueUtils.toValue(values)); - i+=extraLength; - } - } - - messageBuilder.setMessage(builder); - } - } - - public static String [] copyArray(String [] original, int from, int to){ - String [] copyArr = new String[Integer.min(to, original.length) - from]; - for (int i = from; i < to && i < original.length; i++){ - copyArr[i-from] = original[i]; - } - return copyArr; - } - - private int getHeaderArrayLength(String[] header, int index) { - int length = 1; - for (int i = index + 1; i < header.length && header[i].isEmpty(); i++) { - length++; - } - return length; - } - - private void setMetadata(RawMessageMetadata originalMetadata, Message.Builder messageBuilder, String messageType, int currentIndex) { - messageBuilder.setMetadata(MessageMetadata - .newBuilder() - .setId(MessageID.newBuilder(originalMetadata.getId()) - .setTimestamp(originalMetadata.getId().getTimestamp()) - .addSubsequence(currentIndex) - .build()) - - .putAllProperties(originalMetadata.getPropertiesMap()) - .setMessageType(messageType) - ); - } - - private List decodeValues(ByteString body) { - try (InputStream in = new ByteArrayInputStream(body.toByteArray())) { - CsvReader reader = new CsvReader(in, configuration.getDelimiter(), charset); - try { - List result = new ArrayList<>(); - while (reader.readRecord()) { - result.add(reader.getValues()); - } - return result; - } finally { - reader.close(); - } - } catch (IOException e) { - throw new RuntimeException("cannot read data from raw bytes", e); - } - } - - private static class ErrorHolder { - public final String text; - public final String[] currentRow; - public final RawMessage originalMessage; - - private ErrorHolder(String text, String[] currentRow, RawMessage originalMessage) { - this.text = text; - this.currentRow = currentRow; - this.originalMessage = originalMessage; - } - - private ErrorHolder(String text, RawMessage originalMessage) { - this(text, null, originalMessage); - } - } - - private void trimEachElement(String[] elements) { - for (int i = 0; i < elements.length; i++) { - String element = elements[i]; - elements[i] = element.trim(); - } - } -} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt new file mode 100644 index 0000000..57f1de9 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/csv/CsvCodec.kt @@ -0,0 +1,328 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.csvreader.CsvReader +import com.exactpro.th2.codec.DecodeException +import com.exactpro.th2.codec.api.IPipelineCodec +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration +import com.exactpro.th2.common.grpc.AnyMessage as ProtoAnyMessage +import com.exactpro.th2.common.grpc.Value as ProtoValue +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.grpc.Message as ProtoMessage +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage +import com.exactpro.th2.common.grpc.RawMessage as ProtoParsedMessage +import com.exactpro.th2.common.grpc.MessageMetadata as ProtoMessageMetadata +import com.exactpro.th2.common.grpc.RawMessageMetadata as ProtoRawMessageMetadata +import com.exactpro.th2.common.grpc.MessageID as ProtoMessageID +import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.* +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId +import com.exactpro.th2.common.utils.message.sessionAlias +import com.exactpro.th2.common.utils.message.transport.logId +import com.exactpro.th2.common.value.toValue +import mu.KotlinLogging +import java.io.ByteArrayInputStream +import java.io.IOException +import java.nio.charset.Charset +import kotlin.math.min + +class CsvCodec(private val config: CsvCodecConfiguration) : IPipelineCodec { + + private val charset: Charset = Charset.forName(config.encoding) + private val defaultHeader = config.defaultHeader?.asSequence()?.map(String::trim)?.toList()?.toTypedArray()?.apply { + require(isNotEmpty()) { "Default header must not be empty" } + LOGGER.info { "Default header: ${config.defaultHeader}" } + } + + private val protoDecoder = ProtoDecoder(charset, config.delimiter, defaultHeader, config.isPublishHeader, config.validateLength) + private val transportDecoder = TransportDecoder(charset, config.delimiter, defaultHeader, config.isPublishHeader, config.validateLength) + + override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup = decode(messageGroup) + override fun decode(messageGroup: ProtoMessageGroup): ProtoMessageGroup { + val decodedMessages = protoDecoder.decode(messageGroup.messagesList) + return ProtoMessageGroup.newBuilder().addAllMessages(decodedMessages).build() + } + + override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup = decode(messageGroup) + override fun decode(messageGroup: MessageGroup): MessageGroup { + val decodedMessages = transportDecoder.decode(messageGroup.messages) + return MessageGroup(decodedMessages) + } + + private abstract class Decoder( + private val charset: Charset, + private val csvDelimiter: Char, + private val defaultHeader: Array?, + private val publishHeader: Boolean, + private val validateLength: Boolean + ) { + protected abstract val RAW_MESSAGE.messageMetadata: Map + protected abstract val RAW_MESSAGE.messageSessionAlias: String + protected abstract val RAW_MESSAGE.rawBody: ByteArray + protected abstract val RAW_MESSAGE.messageProtocol: String + protected abstract val RAW_MESSAGE.logId: String + protected abstract val RAW_MESSAGE.logData: String + protected abstract val ANY_MESSAGE.isParsed: Boolean + protected abstract val ANY_MESSAGE.isRaw: Boolean + protected abstract val ANY_MESSAGE.asRaw: RAW_MESSAGE + + protected abstract fun createParsedMessage(sourceMessage: RAW_MESSAGE, outputMessageType: String, body: Map, currentIndex: Int): ANY_MESSAGE + protected abstract fun String.toFieldValue(): BODY_FIELD_VALUE + protected abstract fun Array.toFieldValue(): BODY_FIELD_VALUE + + fun decode(messageGroup: List): List { + val groupBuilder = mutableListOf() // ProtoMessageGroup.newBuilder() + val errors: MutableCollection> = mutableListOf() + for (anyMessage in messageGroup) { + if (anyMessage.isParsed) { + groupBuilder += anyMessage + continue + } + if (!anyMessage.isRaw) { + LOGGER.error { "Message should either have a raw or parsed message but has nothing: $anyMessage" } + continue + } + val rawMessage = anyMessage.asRaw + val protocol = rawMessage.messageProtocol + if ("" != protocol && !"csv".equals(protocol, ignoreCase = true)) { + LOGGER.error { "Wrong protocol: message should have empty or 'csv' protocol but has $protocol" } + continue + } + val data = decodeValues(rawMessage.rawBody) + if (data.isEmpty()) { + LOGGER.error { "The raw message does not contains any data: ${rawMessage.logData}" } + errors.add(ErrorHolder("The raw message does not contains any data", rawMessage)) + continue + } + + decodeCsvData(errors, groupBuilder, rawMessage, data) + } + if (errors.isNotEmpty()) { + throw DecodeException( + "Cannot decode some messages:\n" + errors.joinToString("\n") { + "Message ${it.originalMessage.logId} cannot be decoded because ${it.text}" + } + ) + } + return groupBuilder + } + + private fun decodeCsvData( + errors: MutableCollection>, + groupBuilder: MutableList, + rawMessage: RAW_MESSAGE, + data: Iterable> + ) { + val originalMetadata = rawMessage.messageMetadata + val outputMessageType = originalMetadata.getOrDefault(OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE, CSV_MESSAGE_TYPE) + var currentIndex = 0 + var header: Array? = defaultHeader + for (strings in data) { + currentIndex++ + if (strings.isEmpty()) { + LOGGER.error { "Empty raw at $currentIndex index (starts with 1). Data: $data" } + + errors.add(ErrorHolder("Empty raw at $currentIndex index (starts with 1)", rawMessage)) + continue + } + trimEachElement(strings) + if (header == null) { + LOGGER.debug { "Set header to: ${strings.contentToString()}" } + header = strings + if (publishHeader) { + //groupBuilder += createHeadersMessage(rawMessage, strings, currentIndex) + groupBuilder += createParsedMessage(rawMessage, HEADER_MSG_TYPE, mapOf(HEADER_FIELD_NAME to strings.toFieldValue()), currentIndex) + } + continue + } + if (strings.size != header.size && validateLength) { + val msg = String.format( + "Wrong fields count in message. Expected count: %d; actual: %d; session alias: %s", + header.size, strings.size, rawMessage.messageSessionAlias + ) + LOGGER.error(msg) + LOGGER.debug { rawMessage.toString() } + errors.add(ErrorHolder(msg, rawMessage, strings)) + } + + val headerLength = header.size + val rowLength = strings.size + var i = 0 + val body = mutableMapOf() + while (i < headerLength && i < rowLength) { + val extraLength = getHeaderArrayLength(header, i) + if (extraLength == 1) { + body[header[i]] = strings[i].toFieldValue() + i++ + } else { + val values = copyArray(strings, i, i + extraLength) + body[header[i]] = values.toFieldValue() + i += extraLength + } + } + + groupBuilder += createParsedMessage(rawMessage, outputMessageType, body, currentIndex) + } + } + + private fun copyArray(original: Array, from: Int, to: Int) = original.copyOfRange(from, min(to, original.size)) + + private fun getHeaderArrayLength(header: Array, index: Int): Int { + var length = 1 + var i = index + 1 + while (i < header.size && header[i].isEmpty()) { + length++ + i++ + } + return length + } + + private fun decodeValues(body: ByteArray): List> { + try { + ByteArrayInputStream(body).use { + val reader = CsvReader(it, csvDelimiter, charset) + return try { + val result: MutableList> = ArrayList() + while (reader.readRecord()) { + result.add(reader.values) + } + result + } finally { + reader.close() + } + } + } catch (e: IOException) { + throw RuntimeException("cannot read data from raw bytes", e) + } + } + + private class ErrorHolder( + val text: String, + val originalMessage: T, + val currentRow: Array = emptyArray() + ) + + private fun trimEachElement(elements: Array) { + for (i in elements.indices) { + elements[i] = elements[i].trim() + } + } + + } + + private class ProtoDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean + ) : Decoder(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { + + override val ProtoRawMessage.messageMetadata: Map get() = metadata.propertiesMap + override val ProtoRawMessage.messageSessionAlias: String get() = sessionAlias ?: error("No session alias in message") + + override val ProtoRawMessage.rawBody: ByteArray get() = body.toByteArray() + override val ProtoRawMessage.messageProtocol: String get() = metadata.protocol + override val ProtoRawMessage.logId: String get() = this.id.logId + override val com.exactpro.th2.common.grpc.RawMessage.logData: String get() = toJson() + override val ProtoAnyMessage.isParsed: Boolean get() = hasMessage() + override val ProtoAnyMessage.isRaw: Boolean get() = hasRawMessage() + override val ProtoAnyMessage.asRaw: ProtoRawMessage get() = rawMessage + + override fun createParsedMessage(sourceMessage: ProtoRawMessage, outputMessageType: String, body: Map, currentIndex: Int): ProtoAnyMessage { + val builder = ProtoMessage.newBuilder() + .putAllFields(body) + .setParentEventId(sourceMessage.parentEventId) + + // Not set message type + setMetadata(sourceMessage.metadata, builder, outputMessageType, currentIndex) + return ProtoAnyMessage.newBuilder().setMessage(builder).build() + } + + override fun String.toFieldValue(): ProtoValue = toValue() + override fun Array.toFieldValue(): ProtoValue = toValue() + + private fun setMetadata( + originalMetadata: ProtoRawMessageMetadata, + messageBuilder: ProtoMessage.Builder, + messageType: String, + currentIndex: Int + ) { + messageBuilder.setMetadata( + ProtoMessageMetadata + .newBuilder() + .setId( + ProtoMessageID.newBuilder(originalMetadata.id) + .setTimestamp(originalMetadata.id.timestamp) + .addSubsequence(currentIndex) + .build() + ) + .putAllProperties(originalMetadata.propertiesMap) + .setMessageType(messageType) + ) + } + } + + private class TransportDecoder( + charset: Charset, + csvDelimiter: Char, + defaultHeader: Array?, + publishHeader: Boolean, + validateLength: Boolean + ) : Decoder, RawMessage, ParsedMessage, Any>(charset, csvDelimiter, defaultHeader, publishHeader, validateLength) { + + override val RawMessage.messageMetadata: Map get() = this.metadata + override val RawMessage.messageSessionAlias: String get() = id.sessionAlias + + override val RawMessage.rawBody: ByteArray get() = body.toByteArray() + override val RawMessage.messageProtocol: String get() = protocol + override val RawMessage.logId: String get() = id.logId + override val RawMessage.logData: String get() = this.toString() + override val Message<*>.isParsed: Boolean get() = this is ParsedMessage + override val Message<*>.isRaw: Boolean get() = this is RawMessage + override val Message<*>.asRaw: RawMessage get() = this as RawMessage + + override fun createParsedMessage( + sourceMessage: RawMessage, + outputMessageType: String, + body: Map, + currentIndex: Int + ): Message<*> { + return ParsedMessage( + id = sourceMessage.id.toBuilder().addSubsequence(currentIndex).build(), + eventId = sourceMessage.eventId, + type = outputMessageType, + metadata = sourceMessage.metadata, + body = body + ) + } + + override fun String.toFieldValue(): String = this + override fun Array.toFieldValue(): Any = this + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + private const val HEADER_MSG_TYPE = "Csv_Header" + private const val CSV_MESSAGE_TYPE = "Csv_Message" + private const val HEADER_FIELD_NAME = "Header" + private const val OVERRIDE_MESSAGE_TYPE_PROP_NAME_LOWERCASE = "th2.csv.override_message_type" + } +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java index 7ec9080..8b1b5ef 100644 --- a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodec.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; diff --git a/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt new file mode 100644 index 0000000..79ed76b --- /dev/null +++ b/src/test/java/com/exactpro/th2/codec/csv/TestCsvCodecTransport.kt @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec.csv + +import com.exactpro.th2.codec.csv.cfg.CsvCodecConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction +import io.netty.buffer.Unpooled +import org.apache.commons.lang3.StringUtils +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import java.nio.charset.StandardCharsets +import java.time.Instant + +class TestCsvCodecTransport { + @Test + fun decodesDataUsingDefaultHeader() { + val configuration = CsvCodecConfiguration() + configuration.defaultHeader = listOf("A", "B", "C") + val codec = CsvCodec(configuration) + val group = MessageGroup(listOf(createCsvMessage("1,2,3"))) + val decodedGroup = codec.decode(group) + + Assertions.assertEquals(1, decodedGroup.messages.size) + val message = decodedGroup.messages[0] as ParsedMessage + + message.body.size + + Assertions.assertEquals(3, message.body.size) + Assertions.assertEquals(1, message.id.subsequence.size) + Assertions.assertEquals(1, message.id.subsequence[0]) + Assertions.assertEquals("Csv_Message", message.type) + Assertions.assertEquals("1", message.body["A"]) + Assertions.assertEquals("2", message.body["B"]) + Assertions.assertEquals("3", message.body["C"]) + } + + private fun createCsvMessage(vararg data: String): RawMessage { + return createCsvMessage(java.util.Map.of(), *data) + } + + private fun createCsvMessage(metadataProps: Map = mapOf(), vararg data: String): RawMessage { + val body = java.lang.String.join(StringUtils.LF, *data).toByteArray(StandardCharsets.UTF_8) + return RawMessage( + id = MessageId( + "alias_01", + Direction.INCOMING, + System.nanoTime(), + Instant.now() + ), + metadata = metadataProps, + body = Unpooled.wrappedBuffer(body) + ) + } +} \ No newline at end of file