Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

https://github.com/opensearch-project/data-prepper/issues/2445 #2309

Closed
wants to merge 126 commits into from

Conversation

mahesh724
Copy link

Description

  • InputCodec interface added in data-prepper-api
  • ParquetInputCodec implementation added in data-prepper-plugins/parquet-codecs module
  • Repository Restructured
  • JSONCodec moved from S3Source Plugin to parse-json-processor package
  • CSVCodec moved from SESource Plugin to csv-processor package
  • NewLine Codec moved from SESource Plugin to newLine-codecs package

Issues Resolved

Resolves #1532

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

mahesh724 and others added 7 commits January 23, 2023 13:47
-Repository Restructured
-JSONCodec moved from S3Source Plugin to parse-json-processor package
-CSVCodec moved from SESource Plugin to csv-processor package
-NewLine Codec moved from SESource Plugin to NewLine-Codecs package
@mahesh724 mahesh724 requested a review from a team as a code owner February 27, 2023 17:25
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mahesh724 , Thank you for your contribution! I left some comments.

I see an .exe file in here. Please remove that.

@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.model.codec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All new files should have the following header:

/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 */

@@ -12,7 +12,7 @@
import java.util.Objects;

/**
* Configuration class for {@link CsvCodec}.
* Configuration class for {@link CsvInputCodec}.
*/
public class CsvCodecConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this class to CsvInputCodecConfig. There might be some changes between this an output codec.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable is this the correct path? data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/csvinputcodec/CsvCodecConfig.java - This has both data-prepper-plugins/csv-processor and dataprepper/plugins in it. I would think the path would be something like data-prepper-plugins/inputcodecs/src/main/java/org/opensearch/csvcodec or some thing similar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be in the csv-processor. We could reduce code duplication by keeping these CSV plugins together. I don't see any benefit to end users to split these out - since they share the same dependencies.

@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class)
public class CsvCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class);
@DataPrepperPlugin(name = "Csv", pluginType = InputCodec.class, pluginConfigurationType = CsvCodecConfig.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the original case for the name. name = "csv".

@@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.csvinputcodec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better package name would be either of the following:

org.opensearch.dataprepper.plugins.csv.codec

or

org.opensearch.dataprepper.plugins.codec.csv

The latter fits the current package name for the csv-processor project.

*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.jsonCodec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use either of the following package names:

org.opensearch.dataprepper.plugins.json.codec

or

org.opensearch.dataprepper.plugins.codec.json

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file path data-prepper-plugins/json-codec/src/main/java/org/opensearch/dataprepper/jsonCodec/JsonInputCodec.java does not look correct. json codec appears twice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two JsonInputCodec.java file in this PR. Please cleanup and re-submit.

Also, for json, we need the json parsing functionality in a library so that we can re-use it in other places (like future DLQ source code)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly do you mean in a library as the existing JsonInputCodec can be used anywhere within the data-prepper-plugins for json parsing functionality?

Do you mean to use json parsing functionality inside other data-prepper packages instead of only data-prepper-plugins?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mahesh724 , we should not have upper-case package names in Java. I think you can change the package to one of the original suggestions which should resolve both my comment and @kkondaka 's comment.

And yes, we should have only one JsonInputCodec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mahesh - Wouldn't renaming the package as per David's comment would address this issue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashoktelukuntla yes renaming package to org.opensearch.dataprepper.plugins.codec.json and removing one JsonInputCodec.java class would resolve the issue.

@@ -13,6 +13,10 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:parquet-codecs')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This project should not include these codecs. The integration test implementation should have them for testing. But, not in implementation.

Copy link
Author

@mahesh724 mahesh724 Mar 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3 source has integrationTest folder inside src folder that has CsvRecordsGenerator, JsonRecordGenerator and NewLineDelimitedRecordsGenerator classes that require CsvInputCodec, JsonInputCodec and NewlineDelimetedInputCodec classes respectively and if we put them in testImplementation than mentioned classes would not accessible inside s3-Source.src.integrationTest.

Do we need to use only old s3 codec implementation inside integrationTest?

settings.gradle Outdated
include 'data-prepper-plugins:json-codec'
include 'data-prepper-plugins:parquet-codecs'
include 'data-prepper-plugins:newline-codecs'
findProject(':data-prepper-plugins:newline-codecs')?.name = 'newline-codecs'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure what you are trying to do here, but this does not seem right.

Copy link
Author

@mahesh724 mahesh724 Mar 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findProject(':data-prepper-plugins:newline-codecs')?.name = 'newline-codecs'
This line gets autogenearted first time when we build our newline-codecs module but it is redundant and could be removed.

File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX);
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);

ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any other options rather than using a local file?

I understand that as a columnar format you need all the lines to put together any given event. But, maybe there is some other approach available here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This require more investigation as we were not able to find any method/library where we can create a parquetReader object directly by passing inputStream inside constructor and extract all the required details.


for (int row = 0; row < rows; row++) {
SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
eventData.put(MESSAGE_FIELD_NAME, simpleGroup.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this put as a string?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we are trying to put SimpleGroup in eventData hashmap, and are trying to build Event out of it

Map<String, SimpleGroup> eventData = new HashMap<>();
eventData.put(MESSAGE_FIELD_NAME, simpleGroup); final Event event = JacksonLog.builder().withData(eventData).build();

Then we are getting an exception:

No serializer found for class org.apache.parquet.schema.LogicalTypeAnnotation$StringLogicalTypeAnnotation and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.HashMap["message"]->org.apache.parquet.example.data.simple.SimpleGroup["type"]->org.apache.parquet.schema.MessageType["fields"]->java.util.ArrayList[0]->org.apache.parquet.schema.PrimitiveType["logicalTypeAnnotation"])
at com.fasterxml.jackson.databind.ObjectMapper.valueToTree(ObjectMapper.java:3442)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with the SimpleGroup class and the documentation I found is lacking.

In the end, the Event should have keys and values representing the Parquet results. This will create an Event with a single key-value where the value is some arbitrary string.

ParquetMetadata footer = parquetFileReader.getFooter();
MessageType schema = createdParquetSchema(footer);

List<SimpleGroup> simpleGroups = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this list?

@@ -12,7 +12,7 @@
import java.util.Objects;

/**
* Configuration class for {@link CsvCodec}.
* Configuration class for {@link CsvInputCodec}.
*/
public class CsvCodecConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable is this the correct path? data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/csvinputcodec/CsvCodecConfig.java - This has both data-prepper-plugins/csv-processor and dataprepper/plugins in it. I would think the path would be something like data-prepper-plugins/inputcodecs/src/main/java/org/opensearch/csvcodec or some thing similar

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use "*". import individual packages that are used here.

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use "*"

*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.jsonCodec;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file path data-prepper-plugins/json-codec/src/main/java/org/opensearch/dataprepper/jsonCodec/JsonInputCodec.java does not look correct. json codec appears twice

*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.jsonCodec;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two JsonInputCodec.java file in this PR. Please cleanup and re-submit.

Also, for json, we need the json parsing functionality in a library so that we can re-use it in other places (like future DLQ source code)

mahesh724 and others added 3 commits March 2, 2023 20:49
Description:
- Package naming convention changed
- @tempdir used in ParquetInputCodecTest class
- Gradle build files updated
- Wildcard imports removed

Signed-off-by: Mahesh Kariya kariyamahesh82@gmail.com
-Deleted all unwanted files

Signed-off-by: Mahesh Kariya <kariyamahesh82@gmail.com>
implementation project(':data-prepper-api')
implementation 'org.apache.parquet:parquet-hadoop:1.12.0'
implementation 'org.apache.hadoop:hadoop-common:3.3.3'
implementation 'org.apache.parquet:parquet-avro:1.10.1'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these Avro dependencies?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do require these dependencies inside the test class to generate schema and to create random parquet stream. So implementation needs to be changed to testImplementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please change to use testImplementation.

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that when you moved the packages you duplicated a lot of code. I see the same classes repeated. Can you clean these up?

@@ -0,0 +1,59 @@
package org.opensearch.dataprepper.plugins.jsoninputcodec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this class twice? Can we remove this one?

@mahesh724 mahesh724 force-pushed the main branch 2 times, most recently from 8abf693 to 78880b8 Compare March 3, 2023 17:26
mahesh724 and others added 12 commits March 9, 2023 12:51
Signed-off-by: mahesh724 <kariyamahesh82@gmail.com>
Signed-off-by: mahesh724 <kariyamahesh82@gmail.com>
Signed-off-by: mahesh724 <kariyamahesh82@gmail.com>
…fixed a bug in the tests where the data was not sent to the correct sink. (opensearch-project#2061)

Signed-off-by: David Venable <dlv@amazon.com>
…oject#2140)

* Adds ScheduledExecutor Service and runnable task
Signed-off-by: Shivani Shukla <sshkamz@amazon.com>
…ue 2123) (opensearch-project#2124)

* Fix for null pointer exception in remote peer forwarding (fix for issue opensearch-project#2123)

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments to add a counter and not skip when an identification key is missing

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments. Modified to increment the counter only when all identification keys are missing

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added 'final' to the local variable

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added a test with all missing keys

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
* Added implementation of s3 support in Opensearch sink

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
…-project#2100)

Bumps [byte-buddy-agent](https://github.com/raphw/byte-buddy) from 1.12.18 to 1.12.20.
- [Release notes](https://github.com/raphw/byte-buddy/releases)
- [Changelog](https://github.com/raphw/byte-buddy/blob/master/release-notes.md)
- [Commits](raphw/byte-buddy@byte-buddy-1.12.18...byte-buddy-1.12.20)

---
updated-dependencies:
- dependency-name: net.bytebuddy:byte-buddy-agent
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@amazon.com>
…-project#2160)

Bumps [byte-buddy-agent](https://github.com/raphw/byte-buddy) from 1.12.20 to 1.12.22.
- [Release notes](https://github.com/raphw/byte-buddy/releases)
- [Changelog](https://github.com/raphw/byte-buddy/blob/master/release-notes.md)
- [Commits](raphw/byte-buddy@byte-buddy-1.12.20...byte-buddy-1.12.22)

---
updated-dependencies:
- dependency-name: net.bytebuddy:byte-buddy-agent
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…ct#2161)

Bumps [byte-buddy](https://github.com/raphw/byte-buddy) from 1.12.18 to 1.12.22.
- [Release notes](https://github.com/raphw/byte-buddy/releases)
- [Changelog](https://github.com/raphw/byte-buddy/blob/master/release-notes.md)
- [Commits](raphw/byte-buddy@byte-buddy-1.12.18...byte-buddy-1.12.22)

---
updated-dependencies:
- dependency-name: net.bytebuddy:byte-buddy
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
dependabot bot and others added 17 commits March 9, 2023 12:57
…arch-project#2214)

Bumps [org.springframework:spring-context](https://github.com/spring-projects/spring-framework) from 5.3.23 to 5.3.25.
- [Release notes](https://github.com/spring-projects/spring-framework/releases)
- [Commits](spring-projects/spring-framework@v5.3.23...v5.3.25)

---
updated-dependencies:
- dependency-name: org.springframework:spring-context
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…pensearch-project#2223)

Bumps [org.springframework:spring-context](https://github.com/spring-projects/spring-framework) from 5.3.23 to 5.3.25.
- [Release notes](https://github.com/spring-projects/spring-framework/releases)
- [Commits](spring-projects/spring-framework@v5.3.23...v5.3.25)

---
updated-dependencies:
- dependency-name: org.springframework:spring-context
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix grok processor to not create a new record

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Fixed checkStyleMain  failure

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@amazon.com>
Initial commit for OTel trace path changes

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <mdasifsohail7@gmail.com>
…rch-project#2332)

Bumps [org.junit.jupiter:junit-jupiter-api](https://github.com/junit-team/junit5) from 5.9.0 to 5.9.2.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](junit-team/junit5@r5.9.0...r5.9.2)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter-api
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…ct#2331)

Bumps org.assertj:assertj-core from 3.21.0 to 3.24.2.

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…arch-project#2333)

Bumps org.apache.logging.log4j:log4j-bom from 2.19.0 to 2.20.0.

---
updated-dependencies:
- dependency-name: org.apache.logging.log4j:log4j-bom
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…ertain objects in peer-to-peer connections. Additionally, it refactors some application configurations to improve integration testing. Fixes opensearch-project#2310. (opensearch-project#2311)

Signed-off-by: David Venable <dlv@amazon.com>
Signed-off-by: GitHub <noreply@github.com>
Co-authored-by: asifsmohammed <asifsmohammed@users.noreply.github.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
…oject#2343. Updated the AWS CDK as well. (opensearch-project#2345)

Signed-off-by: David Venable <dlv@amazon.com>
Description:
- Package naming convention changed
- @tempdir used in ParquetInputCodecTest class
- Gradle build files updated
- Wildcard imports removed

Signed-off-by: Mahesh Kariya <kariyamahesh82@gmail.com>
-Deleted all unwanted files

Signed-off-by: mahesh724 <kariyamahesh82@gmail.com>
-Bad field scenario handled.
Signed-off-by: umairofficial <umairhusain1010@gmail.com>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mahesh724 for this contribution! I have requested a few changes for everything except the Parquet changes. I'll take a look at those shortly.

@@ -0,0 +1,2 @@

ssl: false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this change. I understand you may have used it for testing, but let's keep this configure the same way it started.

settings.gradle Outdated
@@ -91,4 +91,7 @@ include 'release:docker'
include 'release:maven'
include 'e2e-test:peerforwarder'
include 'rss-source'
include 'data-prepper-plugins:json-codec'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you have the JsonInputCodec in the parse-json-processor Gradle project. So we don't need a new json-codec Gradle project. You can remove this line.

private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

@Override
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can keep these parameters as final as they were in the original code.

@@ -13,6 +13,9 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:csv-processor')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make these three dependencies available only as testImplementation.

The S3 Source shouldn't depend on any particular Codec. The integration tests do rely on them, and that is why we need it in the testImplementation.

@@ -0,0 +1,14 @@
scan-pipeline:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this file. We don't need this file checked in.

*
* @param inputStream The input stream for the source plugin(e.g. S3, Http, RssFeed etc) object
* @param eventConsumer The consumer which handles each event from the stream
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc should also have entry for @throws

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, could you please elaborate?

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use ".*". Please list individual imports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use ".*". Please list individual imports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private static final Logger LOG = LoggerFactory.getLogger(ParquetInputCodec.class);

@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add final to both the arguments.

try {
eventData.put(field.getName(), simpleGroup.getValueToString(fieldIndex, 0));
}
catch (Exception parquetException){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should increment some metric here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per initial discussions and HLD, it was concluded that to metrics are to be captured at codec level. We had initially proposed to include various metrics in the initial versions of HLD but David advised us to not keep either.

eventData.put(field.getName(), simpleGroup.getValueToString(fieldIndex, 0));
}
catch (Exception parquetException){
eventData.put(field.getName(), "unknown");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to indicate which field index was not found. Something like this - "unknown" -> "failed to extract index "?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should keep this at all. Once Data Prepper supports tagging (#629 ) then it might make sense to tag this event with failure.

I think for now we have two options: 1) Drop the entire event; 2) Do not include the field in the event at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. Not including that particular field in that event for now.

}
}
} catch (Exception parquetException) {
LOG.error("An exception occurred while parsing parquet InputStream ", parquetException);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some metric must be incremented here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per initial discussions and HLD, it was concluded that NO metrics are to be captured at codec level. We had initially proposed to include various metrics in the initial versions of HLD but David advised us to not keep either.

}
return new ByteArrayInputStream(INVALID_PARQUET_INPUT_STREAM.getBytes());
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After adding metrics in the exception cases (as commented above), add test cases to test the metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think that this will be required anymore as after today's discussion it was concluded that we have to send the exception back to the source plugin and that source plugin will capture the metric. The test case that will now be required is "test_when_invalid_field_then_throws_exception". Please correct me if wrong.

eventData.put(field.getName(), simpleGroup.getValueToString(fieldIndex, 0));
}
catch (Exception parquetException){
eventData.put(field.getName(), "unknown");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should keep this at all. Once Data Prepper supports tagging (#629 ) then it might make sense to tag this event with failure.

I think for now we have two options: 1) Drop the entire event; 2) Do not include the field in the event at all.

}
catch (Exception parquetException){
eventData.put(field.getName(), "unknown");
LOG.error("Unreadable or bad record");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message should give some indication as to what was wrong along with context.

LOG.error("Unable to retrieve value for field with name = '{}' with error '{}'", field.getName(), parquetException.getMessage());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private void parseParquetStream(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {

final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important to let the user define the location of the directory where the temp files will be created.

Perhaps we should give an option to define the path to the file. If specified, then we use that path. Otherwise, the default is to create a temp file.

Perhaps something like the following?

codec:
  parquet:
    temp_directory: /usr/share/data-prepper/data/parquet/

You can use this version of createTempFile to allow a user-defined temp directory. https://docs.oracle.com/javase/7/docs/api/java/io/File.html#createTempFile(java.lang.String,%20java.lang.String,%20java.io.File)

implementation project(':data-prepper-api')
implementation 'org.apache.parquet:parquet-hadoop:1.12.0'
implementation 'org.apache.hadoop:hadoop-common:3.3.3'
implementation 'org.apache.parquet:parquet-avro:1.10.1'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please change to use testImplementation.

implementation 'org.apache.parquet:parquet-hadoop:1.12.0'
implementation 'org.apache.hadoop:hadoop-common:3.3.3'
implementation 'org.apache.parquet:parquet-avro:1.10.1'
implementation("org.apache.avro:avro:1.9.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also can be testImplementation I believe.

Schema schema = parseSchema();
String OS = System.getProperty("os.name").toLowerCase();

if (OS.contains("win")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is making a change which then may influence the actual code it is testing. (It has a side-effect)

After writing the test data, please unset this property to verify that this is not needed by the code under test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private static InputStream createInvalidParquetStream() {
String OS = System.getProperty("os.name").toLowerCase();
if (OS.contains("win")) {
System.setProperty("hadoop.home.dir", Paths.get("").toAbsolutePath().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is setting a property and has a side-effect. Please see my comment above. I'm not sure this is really needed for setting up the test data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case was failing unless we set hadoop.home.dir. After the test cases are done running, we are unsetting this property.

Signed-off-by: umairofficial <umairhusain1010@gmail.com>
Signed-off-by: umairofficial <umairhusain1010@gmail.com>
@dlvenable dlvenable mentioned this pull request Apr 1, 2023
4 tasks
@svana svana added the v2.3.0 label Apr 20, 2023
@svana svana changed the title Support for Source Codecs #1532 https://github.com/opensearch-project/data-prepper/issues/2445 Apr 20, 2023
@dlvenable dlvenable added this to the v2.3 milestone Apr 20, 2023
@dlvenable dlvenable removed the v2.3.0 label Apr 20, 2023
@dlvenable
Copy link
Member

dlvenable commented May 1, 2023

@dlvenable dlvenable closed this May 1, 2023
@dlvenable dlvenable removed this from the v2.3 milestone May 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet