-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[issue-67] unit tests for FlinkPravegaWriter #76
Conversation
- consolidate existing writer tests into FlinkPravegaWriterITCase - develop a test harness for sink functions - develop new unit tests for FlinkPravegaWriter - adjust checkstyle to allow the use of mockito - make internal fields and classes of FlinkPravegaWriter visible for testing - fix style check failures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I only have a few simple comments and clarification questions.
@@ -58,6 +58,10 @@ | |||
|
|||
private static final long DEFAULT_TX_SCALE_GRACE_MILLIS = 10 * 60 * 1000; // 10 minutes | |||
|
|||
// Writer interface to assist exactly-once and atleast-once functionality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at-least-once semantics
.
private List<Integer> readAllEvents(final String streamName) throws Exception { | ||
Preconditions.checkNotNull(streamName); | ||
|
||
// TODO: Remove the end marker workaround once the following issue is fixed: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pravega issue #408 has been resolved, but I wonder if we should use end of data. does it make sense to use the termination implemented here in pravega/pravega#1916 ?
List<Integer> readElements = readAllEvents(streamName); | ||
|
||
// Now verify that all expected events are present in the stream. Having extra elements are fine since we are | ||
// testing the atleast once writer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at-least-once
* Integration tests for {@link FlinkPravegaWriter}. | ||
*/ | ||
@Slf4j | ||
public class FlinkPravegaWriterITCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does IT
stands for in the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, 'ITCase' is a convention in Flink to designate integration tests.
verify(internalWriter).open(); | ||
|
||
// verify that exceptions don't interfere with close | ||
Mockito.doThrow(new IntentionalRuntimeException()).when(internalWriter).close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what this is doing. Is it the case that the first Mockito.doThrow
will always throw, in which case the following call is not exercised? I only need a clarification of what the intention is here, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention here is to exercise the try..catch blocks within FlinkPravegaWriter::close()
that ensure that everything is definitely closed (by suppressing the exceptions). The mock statement causes the internal writer to throw upon close
. Likewise the executor service is compelled to throw upon shutdown
.
Mockito.doThrow(new IntentionalRuntimeException()).when(context.executorService).shutdown(); | ||
} | ||
Assert.fail("expected an exception"); | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, what's suppose to be throwing IOException
?
.addSink(pravegaWriter).setParallelism(2); | ||
|
||
final long executeStart = System.nanoTime(); | ||
System.out.println(env.getExecutionPlan()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use slf4j?
…into issue-67-2
Closes #67
Change log description
Purpose of the change
How to verify it
N/A