-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for Event Json input and output codecs #4436
Conversation
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
.getThis(); | ||
final JacksonEvent event = (JacksonEvent)logBuilder.build(); | ||
final Record<Event> record = new Record<>(event); | ||
final String externalOriginationTime = (String)metadata.get(EventJsonDefines.EXTERNAL_ORIGINATION_TIME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably have Jackson handle these conversions by annotating the DefaultEventMetadata properties and deserializing the metadata to the DefaultEventMetadata class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the time an Instant
in production use cases?
final JacksonLog.Builder logBuilder = JacksonLog.builder() | ||
.withData(data) | ||
.withEventMetadataAttributes((Map<String, Object>)metadata.get(EventJsonDefines.ATTRIBUTES)) | ||
.withTimeReceived(Instant.parse((String)metadata.get(EventJsonDefines.TIME_RECEIVED))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this configurable and default to not passing anything here so that the new time the Event reaches the source is applied to the Event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
event time received should not be optional, I think. Tags, attributes and externalorigination time are optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default behavior when using this codec should be to create a new time received time, which is the default behavior when an Event is created.
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
There's still some missing test coverage in |
/* | ||
public void setEventType(String eType) { | ||
eventType = eType; | ||
} | ||
|
||
public void setTimeReceived(Instant receivedTime) { | ||
timeReceived = receivedTime; | ||
} | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this ?
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.codec.eventjson; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java convention for packages with spaces is to use underscore. So you could rename to:
package org.opensearch.dataprepper.plugins.codec.event_json;
outputCodec.writeEvent(event, outputStream); | ||
outputCodec.complete(outputStream); | ||
assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON)); | ||
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee that this consumer is even called.
A simple change is to update the consumer to put the records into a collection. Then assert the size. Then assert the format.
List<Record> records = new LinkedList<>();
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);
assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = record.getData();
// All the same from here.
}
public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { | ||
Objects.requireNonNull(outputStream); | ||
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); | ||
generator.writeStartObject(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that you are starting an object. But, there is no array. So won't it look like this?
{{"data":..., "metadata"...},{"data":..., "metadata"...},{"data":..., "metadata"...}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Modified to generate an array.
outputCodec.start(outputStream, null, null); | ||
outputCodec.writeEvent(event, outputStream); | ||
outputCodec.complete(outputStream); | ||
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as below about the assertions here.
} | ||
|
||
@Test | ||
public void extendedTest() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a test that operates against multiple events.
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
|
||
public class EventJsonInputOutputCodecTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that this test is asserting symmetry between the codecs.
We should also have test suites for the individual codecs with known values to ensure that the format is consistent. That is, somebody could make a change which causes the input codec to no longer work with the same values. The current tests would pass, but we would have broken backward compatibility which could pose problems when using this to write to S3.
*/ | ||
package org.opensearch.dataprepper.plugins.codec.eventjson; | ||
|
||
public class EventJsonOutputCodecConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just keeping it for future use where we may want to control what fields should be included in the output.
public void emptyTest() throws Exception { | ||
ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes()); | ||
inputCodec = createInputCodec(); | ||
inputCodec.parse(inputStream, record -> { }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make these different tests for isolation:
@ParameterizedTest
@ValueSource(strings = { "[]", "[{}]", "{}" })
...
inputCodec = createInputCodec(); | ||
inputCodec.parse(inputStream, record -> { }); | ||
inputStream = new ByteArrayInputStream("[]".getBytes()); | ||
inputCodec.parse(inputStream, record -> { }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do something to assert this.
Probably:
Consumer<Record<Event>> consumer = mock(Consumer.class);
inputCodec.parse(inputStream, consumer);
verifyNoInteractions(consumer);
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -44,22 +44,24 @@ public String getExtension() { | |||
public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { | |||
Objects.requireNonNull(outputStream); | |||
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); | |||
generator.writeStartObject(); | |||
generator.writeStartArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should keep the writeStartObject
.
This could allow us to also include metadata such as the version.
e.g.
{
"metadata" : {"version" : "2.8"},
"events" : [
{"data":...,"metadata"},
{"data":...,"metadata"},
{"data":...,"metadata"},
]
}
See this code:
Lines 55 to 57 in f9a3a60
generator.writeStartObject(); | |
generator.writeFieldName(config.getKeyName()); | |
generator.writeStartArray(); |
Though, I don't think we need to make the events
key configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. But the following should be enough.
{
"version" : "2.8",
"events" : [
{"data":...,"metadata"},
{"data":...,"metadata"},
{"data":...,"metadata"},
]
}
if (data == null) { | ||
return null; | ||
} | ||
if (overrideTimeReceived) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to run this part if override_time_received is false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean, the overrideTimeReceived flag "true" means it will override with the value from the data and "false" means it will take Instant.now()
?
} | ||
|
||
public EventJsonInputCodec createInputCodec() { | ||
when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to setup and then add a test for when the override_time_received is true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified as per the suggestion
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class); | ||
final List<Map<String, Object>> maps = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS); | ||
final String version = (String)innerJson.get(EventJsonDefines.VERSION); | ||
if (!version.equals(DataPrepperVersion.getCurrentVersion().toString())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need strict matching here? One thought is that we could be strict on the major version and ignore the minor version.
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { | ||
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class); | ||
final List<Map<String, Object>> maps = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS); | ||
final String version = (String)innerJson.get(EventJsonDefines.VERSION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be good to move this check into a small method to help make the code more readable.
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -48,6 +48,22 @@ public EventJsonInputCodec(final EventJsonInputCodecConfig config) { | |||
this.overrideTimeReceived = config.getOverrideTimeReceived(); | |||
} | |||
|
|||
private boolean isCompatibleVersion(Map<String, Object> json) { | |||
final String versionStr = (String)json.get(EventJsonDefines.VERSION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DataPrepperVersion
class provides a lot of this.
final String versionStr = (String)json.get(EventJsonDefines.VERSION);
final DataPrepperVersion definedVersion = DataPrepperVersion.parse(versionStr);
if(definedVersion.getMajorVersion() != DataPrepperVersion.getCurrentVersion().getMajorVersion()) {
LOG.error("Version mismatch! Current version {} Received data version {}", DataPrepperVersion.getCurrentVersion(), definedVersion);
return false;
}
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this and making these improvements to the feature!
* Event Json input and output codecs Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Modified test case to check for event metadata attributes Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Modified the coverage to 0.9 Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Fixes for failing coverage tests Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Fixed test coverage Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Added more tests Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Added more tests for coverage Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Fixed code coverage failure Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Description
Support for
event_json
input and output codecsIssues Resolved
Resolves #4404
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.