Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes a bug in the S3 sink where events without handles throw NPE #2814

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ void output(Collection<Record<Event>> records) {
final byte[] encodedBytes = encodedEvent.getBytes();

currentBuffer.writeEvent(encodedBytes);
bufferedEventHandles.add(event.getEventHandle());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a unit test to confirm this is fixed? And help prevent future NPEs around this leak into this plugin?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tried this, but I was not able to get it to crash such that the test would actually show any verification.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null event handle would be the default case when not using end-to-end acknowledgements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmanning09 , I found that there was an unnecessary try-catch on NullPointerException. I removed that and was able to add a valid test.

if(event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
final String s3Key = generateKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
Expand All @@ -127,14 +129,13 @@ void output(Collection<Record<Event>> records) {
currentBuffer = bufferFactory.getBuffer();
}
}
} catch (NullPointerException | IOException | InterruptedException e) {
} catch (IOException | InterruptedException e) {
LOG.error("Exception while write event into buffer :", e);
Thread.currentThread().interrupt();
}
reentrantLock.unlock();
}

private void releaseEventHandles(boolean result) {
private void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,6 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException {
verify(snapshotSuccessCounter, times(50)).increment();
}

@Test
void test_catch_output_exception_cover() {
codec = null;
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
assertThat(s3SinkService, instanceOf(S3SinkService.class));
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(0)).increment();
}

@Test
void test_output_with_uploadedToS3_success() throws IOException {

Expand Down Expand Up @@ -340,6 +330,35 @@ void output_will_release_all_handles_since_a_flush() throws IOException {
}
}

@Test
void output_will_skip_releasing_events_without_EventHandle_objects() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
records.stream()
.map(Record::getData)
.map(event -> (JacksonEvent) event)
.forEach(event -> event.setEventHandle(null));

s3SinkService.output(records);

final Collection<Record<Event>> records2 = generateRandomStringEventRecord();
s3SinkService.output(records2);

final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
verify(eventHandle).release(true);
}
}

@Test
void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
Expand Down