Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Github-issue#1048 : s3-sink with local-file buffer implementation. #2645

Merged

Conversation

deepaksahu562
Copy link
Contributor

@deepaksahu562 deepaksahu562 commented May 5, 2023

Description

  • Implementation of s3-sink local-file buffer functionality.

Issues Resolved

GitHub-issue #1048

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

/**
* Defines all the buffer types enumerations.
*/
public enum BufferTypeOptions {

INMEMORY,
LOCALFILE
LOCALFILE("local_file", new LocalFileBuffer());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the plan is to support both INMEMORY and LOCALFILE. Isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are supporting both INMEMORY and LOCALFILE buffer type. The INMEMORY buffer functionality was raised as separate PR: #2623

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
@deepaksahu562 deepaksahu562 reopened this May 15, 2023
@codecov
Copy link

codecov bot commented May 15, 2023

Codecov Report

Merging #2645 (c2394dc) into main (191ede2) will increase coverage by 0.04%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##               main    #2645      +/-   ##
============================================
+ Coverage     93.56%   93.61%   +0.04%     
  Complexity     2254     2254              
============================================
  Files           262      262              
  Lines          6308     6308              
  Branches        521      521              
============================================
+ Hits           5902     5905       +3     
+ Misses          268      266       -2     
+ Partials        138      137       -1     

see 2 files with indirect coverage changes

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
@deepaksahu562
Copy link
Contributor Author

@ashoktelukuntla \ @dlvenable
Rebase the branch and all checks have been passed.
Would you please review and merge it.


LocalFileBuffer() {
try {
localFile = new File(String.valueOf(UUID.randomUUID()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a temp file here. It could be in the system default temp, or we can allow a user to configure this directory.

Java provides two createTempFile functions that can help here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Implemented using Java provided functionality.

public class LocalFileBuffer implements Buffer {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class);
private BufferedOutputStream bufferedOutputStream;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend making this just an OutputStream so that the code doesn't have to change if you change this stream class.

private OutputStream outputStream;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified.

*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
bufferedOutputStream.write(bytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some synchronization for these writes. Or the events may end up out-of-order.

We can add it here (and to the InMemoryBuffer). Or perhaps we can synchronize in the S3SinkService itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have synchronized in the S3SinkService itself.

Assertions.assertNotNull(localFileBufferFactory);
Buffer buffer = localFileBufferFactory.getBuffer();
Assertions.assertNotNull(buffer);
assertThat(buffer, instanceOf(Buffer.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should assert that this is an instance of LocalFileBuffer as well.

assertThat(buffer, instanceOf(LocalFileBuffer.class))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepaksahu562 , Did you push the change here? I don't see a new assertion for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable, Earlier, I didn't understand. Now I can understand and modify as requested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be as simple as changing this line to:

assertThat(buffer, instanceOf(LocalFileBuffer.class))

Here it is with a little more context.

Buffer buffer = localFileBufferFactory.getBuffer();
Assertions.assertNotNull(buffer);
assertThat(buffer, instanceOf(LocalFileBuffer.class))

We already know this is a Buffer. That is handled statically by Java. But, we want to know that this is returning the LocalFileBuffer and not an InMemoryBuffer (or some other buffer sub-class).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this line per my latest comment. Then we should be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your clarification, Modified as you suggested.

}
assertThat(localFileBuffer.getSize(), greaterThan(1l));
assertThat(localFileBuffer.getEventCount(), equalTo(55));
assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(0L));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to assert the exact duration here.

assertThat(localFileBuffer.getDuration(), equalTo(55 * 1000L));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

assertDoesNotThrow(() -> {
localFileBuffer.flushToS3(s3Client, BUCKET_NAME, "log.txt");
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to verify that the file was flushed to S3. Since this is not an integration test, you will do this via mocking.

It should look something like the following:

ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);

verify(s3Client).putObject(putObjectRequestArgumentCaptor.capture(), any(RequestBody.class));

PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue();
// add assertions for the bucket and key values.

Also, this should validate that the file was actually deleted. You will do this with real files (not mocks).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified and addressed as per suggestions.

public class LocalFileBufferFactory implements BufferFactory {
@Override
public Buffer getBuffer() {
return new LocalFileBuffer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably improve your testing by choosing the File here and passing it in as a parameter.

File = File.createTempDir(...);
return new LocalFileBuffer(file);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

@@ -15,7 +18,16 @@
*/
public enum BufferTypeOptions {

INMEMORY("in_memory", new InMemoryBuffer());
INMEMORY("in_memory", new InMemoryBuffer()),
LOCALFILE("local_file", new Object() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change to make this anonymous class? I think you can continue to make this work with new LocalFileBufferFactory. This helps with unit testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified with new LocalFileBufferFactory.

Assertions.assertNotNull(localFileBufferFactory);
Buffer buffer = localFileBufferFactory.getBuffer();
Assertions.assertNotNull(buffer);
assertThat(buffer, instanceOf(Buffer.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepaksahu562 , Did you push the change here? I don't see a new assertion for this.

File tempFile = null;
Buffer localfileBuffer = null;
try {
tempFile = File.createTempFile(PREFIX, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the suffix should be .log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
this.option = option.toLowerCase();
this.bufferType = bufferType;
}

public Buffer getBufferType() {
public BufferFactory getBufferType() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a good change. Thanks!

Assertions.assertNotNull(localFileBufferFactory);
Buffer buffer = localFileBufferFactory.getBuffer();
Assertions.assertNotNull(buffer);
assertThat(buffer, instanceOf(Buffer.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this line per my latest comment. Then we should be good.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Copy link
Collaborator

@kkondaka kkondaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@kkondaka kkondaka merged commit 9c7b3ec into opensearch-project:main May 16, 2023
26 checks passed
@dlvenable dlvenable mentioned this pull request Jun 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants