-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes some flaky asynchronous tests. #39
Conversation
3b1984e
to
a473e3c
Compare
Thank you for explaining Andrew. it looks good to me. 👍 |
@@ -9,6 +9,10 @@ Current | |||
------- | |||
|
|||
### Added: | |||
- [A test implementation of the `AsynchronousWorkflowsBuilder`, `TestAsynchronousWorkflowsBuilder`](http://github.com/yahoo/fili/pull/36) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR 39, yes? (also other change log entry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heheh. Looks like I copied-pasted-forgot-to-modify.
workflowMap.put(Workflow.SYNCHRONOUS, workflows.getSynchronousPayload()); | ||
workflowMap.put(Workflow.ASYNCHRONOUS, workflows.getAsynchronousPayload()); | ||
workflowMap.put(Workflow.PRERESPONSE_READY, workflows.getPreResponseReadyNotifications()); | ||
workflowMap.put(Workflow.JOB_MARKED_COMPLETE, workflows.getJobMarkedCompleteNotifications()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be nice to decouple this from the specific workflow steps a bit. As it stands, if a workflow adds steps, there's no way to attach a subscriber to it. I'm not sure we can do it right now, since this is Java code, and to do it cleanly we'd want to use groovy's ability to call methods as strings, but it would be a nice-to-have capability. Likely more for "do later" rather than any time soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I wasn't a particularly big fan of this, either. In Java, the best approach to handle this sort of thing may be to make the AsynchronousWorkflows
object less rigid. Give it a map that maps interfaces to Observables
, with an enum as default implementation of the interface that enumerates the current workflows, rather than having explicit fields and getters. Then, this class iterate over the map.
I consider that to be outside the scope of this PR, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that is a good way of doing it in Java land. And yeah, it can happen later. No need to slow down this PR.
def setup() { | ||
TestAsynchronousWorkflowsBuilder.addSubscriber( | ||
TestAsynchronousWorkflowsBuilder.Workflow.JOB_MARKED_COMPLETE, | ||
new Observer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly and clutters the test with more stuff than is needed to get the point across. We can do better (after all, this is Groovy):
TestAsynchronousWorkflowsBuilder.addSubscriber(TestAsynchronousWorkflowsBuilder.Workflow.JOB_MARKED_COMPLETE)
{ jobMetadataReady.countDown() }
{ throw it }
Or we can pass them as parameters if you don't like the implicit closure-passing style
TestAsynchronousWorkflowsBuilder.addSubscriber(
TestAsynchronousWorkflowsBuilder.Workflow.JOB_MARKED_COMPLETE,
{jobMetadataReady.countDown()},
{throw it}
)
To achieve this, we need to add some more helpers to the TestAsynchronousWorkflowsBuilder
:
/**
* Adds the specified subscriber to the specified workflow.
*
* @param workflow The workflow to add the countdown latch to
* @param workflowSubscriber The subscriber that should be added to the specified workflow
*/
public static void addSubscriber(Workflow workflow, Observer workflowSubscriber) {
SUBSCRIBERS.put(workflow, workflowSubscriber);
}
/**
* Adds the subscriber (specified in individual method components) to the specified workflow.
*
* @param workflow The workflow to add the countdown latch to
* @param onNext onNext method for the observer
* @param onCompleted onCompleted method for the observer
* @param onError onError method for the observer
*/
public static void addSubscriber(
Workflow workflow,
Consumer<Object> onNext,
Runnable onCompleted,
Consumer<Throwable> onError
) {
Observer workflowSubscriber = new Observer() {
@Override
public void onNext(Object next) {
onNext.accept(next);
}
@Override
public void onCompleted() {
onCompleted.run();
}
@Override
public void onError(Throwable error) {
onError.accept(error);
}
};
SUBSCRIBERS.put(workflow, workflowSubscriber);
}
/**
* Subscribes the partially-specified observer to the specified workflow.
* <p>
* Uses a no-op onCompleted method.
*
* @param workflow The workflow to add the countdown latch to
* @param onNext onNext method for the observer
* @param onError onError method for the observer
*/
public static void addSubscriber(Workflow workflow, Consumer<Object> onNext, Consumer<Throwable> onError) {
addSubscriber(workflow, onNext::accept, () -> { }, ignored -> { });
}
/**
* Subscribes the partially-specified observer to the specified workflow.
* <p>
* Uses no-op onCompleted and onError methods.
*
* @param workflow The workflow to add the countdown latch to
* @param onNext onNext method for the observer
*/
public static void addSubscriber(Workflow workflow, Consumer<Object> onNext) {
addSubscriber(workflow, onNext::accept, ignored -> { });
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this suggestion follows into the other spec that gets updated in this PR also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! 👍
9807263
to
e08cd30
Compare
--A test version of the AsynchronousWorkflowsBuilder is introduced. This version extends the DefaultAsynchronousWorkflowsBuilder, and builds the exact same workflow. However, it also provides a mechanism for outside classes to add additional Observers to the workflows, allowing them to do such things as add countdown latches to have thread-safe tests.
e08cd30
to
1fdbde9
Compare
8e73a8b
to
f58b0bd
Compare
broadCastChannelPreResponseObservable.connect(); | ||
|
||
return broadCastChannelPreResponseObservable; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been moved into the method JobsServlet::getResults
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs note in Changelog
|
||
then: | ||
testSubscriber.assertReceivedOnNext([ticket1PreResponse]) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is subsumed by tests in the JobsServletReactiveChainforResultsEndpointSpec
that verify the results come back synchronously if they are ready within the asyncAfter
timeout.
|
||
then: | ||
testSubscriber.assertReceivedOnNext([ticket1PreResponse]) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subsumed by a new test in the JobsServletReactiveChainforResultsEndpointSpec
then: "preResponseObservable is empty (the chain is complete, and no values were sent)" | ||
ReactiveTestUtils.assertCompletedWithoutError(testSubscriber) | ||
testSubscriber.assertNoValues() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subsumed by tests in the JobsServletReactiveChainforResultsEndpointSpec
that verify that the getResults
method returns an empty Observable if the results are not ready within the asynchronous timeout.
} | ||
|
||
def "If the PreResponse is not available in the PreResponseStore initially and the notification from broadcastChannel is received within the async timeout, we go to the PreResponsestore twice"() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
def "If the PreResponse is available in the PreResponseStore and the notification from broadcastChannel is received within the async timeout, we go to the PreResponsestore twice"() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether or not the results are available actually has zero impact on this test. This test is testing how notifications from the broadcastChannel affects processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor indentation issues
private final String filters; | ||
|
||
/** | ||
* Parses the API request URL and generates the Api Request object. | ||
* | ||
* @param format response data format JSON or CSV. Default is JSON. | ||
* @param format response data format JSON or CSV. Default is JSON. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like an extra whitespace
got added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
* @param asyncAfter How long the user is willing to wait for a synchronous request in milliseconds | ||
* @param perPage number of rows to display per page of results. If present in the original request, | ||
* must be a positive integer. If not present, must be the empty string. | ||
* must be a positive integer. If not present, must be the empty string. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like an indentation issue with the editor. Can we fix this as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
f58b0bd
to
6cca0ae
Compare
@@ -37,6 +41,10 @@ Current | |||
|
|||
|
|||
### Changed: | |||
- [The `TestBinderFactory` now uses the `TestAsynchronousWorkflowsBuilder`](http://github.com/yahoo/fili/pull/39) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More info needed in the changelog around the API changes
|
||
/** | ||
* An in-memory implementation of PreResponseStore mainly for testing purposes. It only provides functionality to save | ||
* an entry to store and get an entry from the store. It does not have delete functionality nor does it take care of | ||
* cleaning stale data. | ||
* <p> | ||
* Since the HashPreResponseStore is intended primarily for testing, it also includes two maps of tickets to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts here:
1.) If this is primarily for testing, we should move this class to the test
source root
2.) If this class is useful for non-testing, then we should leave it here and remove the latches (assuming they are not useful for non-testing workloads)
3.) If we want both (ie. keep this class for real use, and also give it the latch counting behavior for tests to use, then we should have a LatchedHashPreResponseStore
in the test
source root that extends this one, adding the latching behavior.
Either way, things that are only for testing don't belong in code that is in the src
root. That's what the test
root is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
broadCastChannelPreResponseObservable.connect(); | ||
|
||
return broadCastChannelPreResponseObservable; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs note in Changelog
* subscribers. We use the replay operator so that the preResponseObservable upon connection, will begin | ||
* collecting values. | ||
* Once a new observer subscribes to the observable, it will have all the collected values replayed to it. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment should get moved inside the else
where it pertains.
* Once a new observer subscribes to the observable, it will have all the collected values replayed to it. | ||
*/ | ||
if (asyncAfter == JobsApiRequest.ASYNCHRONOUS_ASYNC_AFTER_VALUE) { | ||
return Observable.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on why this is good to do would be nice. (It's not immediately clear why we're doing this)
.replay(1); | ||
broadcastChannelNotifications.connect(); | ||
return preResponseStore.get(ticket).switchIfEmpty( | ||
applyTimeoutIfNeeded(broadcastChannelNotifications, asyncAfter).flatMap(preResponseStore::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is doing the correct thing? To me, this looks like it will wait until preResponseStore.get()
completes, and then activate the "timeout" (if needed).
I'm guessing this is now doing the right thing, but it's not immediately obvious, so more comments are likely needed around what's happening, and why it's correct. (talking about the ordering of things might not hurt either)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that before, the timeout step was injected before the preResponseStore::get
call...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracing through how this got called a bit more, I see how this is still doing the same thing, but it still wasn't obvious what was going on as far as ordering, so comments would definitely help make that better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since asyncAfter
is best effort for anything but always
(which doesn't go to the store), it doesn't really matter either way. This basically gives the asyncAfter=0
the semantics of "If the results are available, give them to me, otherwise quickly send back the async payload."
//Check the PreResponseStore to see if the PreResponse associated with the given ticket has been stored in | ||
//the PreResponseStore. If not, wait for a PreResponse for the amount of time specified in async. | ||
return preResponseStore.get(ticket).switchIfEmpty(broadCastChannelPreResponseObservable); | ||
private <T> Observable<T> applyTimeoutIfNeeded(Observable<T> observable, long asyncAfter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like what we're actually making is a new "operator" that extends the timeout and adds "always" and "never" semantics in the same parameter. With that in mind, does it makes sense for this method to take care of all 3 options? I see it looking something like this, perhaps:
private <T> Observable<T> applyTimeoutIfNeeded(Observable<T> primary, long asyncAfter, Observable<T> alternate) {
return asyncAfter == JobsApiRequest.ASYNCHRONOUS_ASYNC_AFTER_VALUE ?
// Always alternate
alternate :
asyncAfter == JobsApiRequest.SYNCHRONOUS_ASYNC_AFTER_VALUE ?
// Always primary
primary :
// Timeout-based switch
primary.timeout(asyncAfter, TimeUnit.MILLISECONDS, alternate);
}
Note that I also decoupled the "alternate" stream from the method, so that it's passed in, and gave params better names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have this method dealing with the "tri-state" behavior, then we can consolidate that logic in one place, and I think it may simplify the switching behavior a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look closely, you'll see that the result of applyTimeOutIfNeeded
is actually being applied inside of a preResponseStore.get().switchIfEmpty
call. In other words, we aren't checking the PreResponseStore if the request is always asynchronous (there's no point), but we are in the other two cases.
So we can't really merge the three cases into a single helper method without more thought than is worth it. If we did, we'd probably just end up with implementing all of getResults
in applyTimeoutIfNeeded
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, makes sense
|
||
and: "We miss the notification that the preResponse is stored in the PreResponseStore" | ||
when: "We miss the notification that the preResponse is stored in the PreResponseStore" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be a given
piece. The only "when" involved here is the getResults
call.
and: "We receive the notification after async timeout" | ||
broadcastChannel.publish("ticket4") | ||
and: "We wait for the first attempt to get results from the store to come up empty before we add fake results" | ||
getTicket1Latch.await(30, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we waiting for up to 30 seconds before continuing here? (note that if this timeout expires, this just returns false, even if the latch has not released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that the test doesn't hang indefinitely in case something goes wrong and we never actually countdown the latches. I've moved that timeout into a timeout annotation on the test.
then: "then we go to the PreResponseStore exactly once to get the ticket" | ||
1 * mockPreResponseStore.get(_) >> Observable.just(Mock(PreResponse)) | ||
and: "We wait for the results to be successfully stored before sending a ready notification" | ||
saveTicket1Latch.await(30, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we waiting for up to 30 seconds before continuing here? (note that if this timeout expires, this just returns false, even if the latch has not released.
when: "We start the async chain" | ||
mockJobServlet.getResults("ticket4", apiRequest1.asyncAfter) | ||
//The delay is to ensure that we get the notification after async timeout | ||
Thread.sleep(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is testing that things behave appropriately in the case where we have a numeric asyncAfter
. Unfortunately, we currently have no way of injecting latches or anything into the workflow. So our choices are to sleep (which the test was already doing before I simplified it), or refactor the JobsServlet so that we have some way of injecting additional components into the workflow.
While the second may or may not be a valid approach (depending on how much control we want to give clients over the JobServlet), it's outside the scope of this PR.
- [Removed `JobsApiRequest::handleBroadcastChannelNotification`](https://github.com/yahoo/fili/pull/39) | ||
* That logic does not really belong in the `JobsApiRequest` (which is responsible for modeling a response, not processing it), and has | ||
been consolidated into the `JobsServlet`. | ||
>>>>>>> Stashed changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some merge conflict markers in here still...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently (vanilla) git says nothing if it has conflicts when applying a stash. This is annoying.
private final String filters; | ||
|
||
/** | ||
* Parses the API request URL and generates the Api Request object. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep
======= | ||
- [HashPreResponseStore moved to `test` root directory.](https://github.com/yahoo/fili/pull/39) | ||
* The `HashPreResponseStore` is really intended only for testing, and does not have capabilities (i.e. TTL) that are needed for production. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not neeed
👍 |
- [The `TestBinderFactory` now uses the `TestAsynchronousWorkflowsBuilder`](http://github.com/yahoo/fili/pull/39) | ||
* This allows the asynchronous functional tests to add countdown latches to the workflows where necessary, allowing | ||
for thread-safe tests. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an extra blank line in here that's not needed
/NotABlocker
It is good to add a test case for Apart from the above comment, it looks good to me. 👍 |
--`always` is guaranteed to return an asynchronous payload, regardless of how quickly the results come back. -- The asynchronous functional tests that expect an asynchronous result use the `always` keyword in order to ensure consistency.
0b54876
to
33787a2
Compare
--A test version of the AsynchronousWorkflowsBuilder is introduced. This
version extends the DefaultAsynchronousWorkflowsBuilder, and builds the
exact same workflow. However, it also provides a mechanism for outside
classes to add additional Observers to the workflows, allowing them to
do such things as add countdown latches to have thread-safe tests.