Add support for custom evictor and trigger policies #2653
Conversation
Add support in both the streamlet and topology apis for the usage of custom eviction policies extending com.twitter.heron.api.windowing.EvictionPolicy and trigger policies extending com.twitter.heron.api.windowing.TriggerPolicy to enable user-defined windowing schemes.
As an aside, the travis build appears to be failing due to style issues. I can make those changes, but I'm unsure if I'm allowed to sign the CLA, and therefore contribute, given my current terms of employment. I'll find that out tomorrow. |
@dancollins34 I believe the reason why travis is failing is checkstyle issues. |
Fixed Checkstyle Issues
Thank you for the advice. I've removed the style issues. |
@@ -35,6 +35,9 @@ | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
|
|||
import com.twitter.heron.api.tuple.Tuple; | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Please remove extra line
evictionPolicy, topoConf); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Place remove extra line
It will be nice to provide some examples of how to use this feature.
…On Tue, Jan 2, 2018 at 11:20 PM Boyang Jerry Peng ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
<#2653 (comment)>:
> + if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR)) {
+ evictionPolicy = (EvictionPolicy<Tuple, ?>)
+ topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR);
+ } else {
+ evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDurationMs);
+ }
+
+ if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER)) {
+ triggerPolicy = (TriggerPolicy<Tuple, ?>)
+ topoConf.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER);
+ } else {
+ triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs, manager,
+ evictionPolicy, topoConf);
+ }
+
+
Place remove extra line
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#2653 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAWcRGuVfwFLq2mjTCfxoTMhhmfWrcy3ks5tGyongaJpZM4RRUjm>
.
|
@@ -237,9 +237,23 @@ protected void validate(Map<String, Object> topoConf, Count windowLengthCount, L | |||
// validate | |||
validate(topoConf, windowLengthCount, windowLengthDurationMs, slidingIntervalCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a custom trigger or evictor, validate should not be called. Arguments would be directly passed into the custom evictor and trigger when they are intialized
Generally looks good, but I would like to bring a of point for discussion: Even when using custom triggers and evictors, should users still be be required to use the existing API (e.g. setTopologyBoltsWindowLengthDurationMs(long value) in WindowingConfigs) to set window lengths and sliding intervals? or should such parameters be passed directly into the custom trigger and evictors objects e.g. passing these parameter via their respective constructors? I can see pros and cons in each approach |
I don’t think there’s any way you could prescribe the universe of all possible windowing parameters in that way. I think it makes a lot more sense to make the extent of the requirements to be “subclass triggerPolicy/evictionPolicy” instead of trying to restrict the subset of what each of those could be. To your comment about validate, I think it might make sense to make this a 3 condition if statement. I.E. if eviction and trigger policy are defined in the config, use them. If neither eviction nor trigger policy are defined, run validate and run the code currently in the master branch, and if only one of them is defined, raise an exception. I’ll fix the blank lines, make this above change and try to write some example code tonight. |
…utor to require both Policies to be provided if one of them is
@jerrypeng - can you please re-review again? |
So, I've noticed something else about TriggerPolicies that I had missed on my first go round. There is no way for the API user to actually instantiate an instance of a custom TriggerPolicy to put it into the WindowConfig object because they would first need to have a reference to the WindowManager to act as a TriggerHandler. I'm writing a commit attempting to address this now. |
…ush then pull clean copy)
…ndowManager, Config object, TriggerHandler and EvictionPolicy
@dancollins34 - some errors have been occurring in the unit tests. Can you please fix them? |
Built on my machine that’s weird. I’ll fix it tonight. |
Looks like it just timed out. Re running build |
I see the failures now. Attempting to address them now. |
…ning "started" again for CountTriggerPolicy
It looks like this one failed on a c++ test? I’m not sure how that’s possible I haven’t modified the c++ code in any way. |
Example usage public class RandomTriggerPolicy<T extends Serializable> extends AbstractBaseTriggerPolicy<T, Boolean> {
Float triggerChance;
Random sampler;
public RandomTriggerPolicy(Float triggerChance){
super(false, false, false);
if(triggerChance > 1.0 || triggerChance < 0.0){
throw new IllegalArgumentException("Cannot have a likelihood outside of 0 to 1 range");
}
this.triggerChance = triggerChance;
this.sampler = new Random();
}
@Override
public void track(Event<T> event) {
if (started && !event.isWatermark()) {
if (sampler.nextDouble() < triggerChance){
this.handler.onTrigger();
}
}
}
@Override
public void reset() {
// NOOP
}
@Override
public void shutdown() {
// NOOP
}
@Override
public Boolean getState() {
return false;
}
@Override
public void restoreState(Boolean state) {
// NOOP
}
}
public class AllEvictionPolicy<T extends Serializable> implements EvictionPolicy<T, Boolean> {
private EvictionContext context;
@Override
public Action evict(Event<T> event) {
return Action.PROCESS;
}
@Override
public void track(Event<T> event) {
// NOOP
}
@Override
public void setContext(EvictionContext context) {
this.context = context;
}
@Override
public EvictionContext getContext() {
return context;
}
@Override
public void reset() {
// NOOP
}
@Override
public Boolean getState() {
return true;
}
@Override
public void restoreState(Boolean state) {
// NOOP
}
}
public class Main{
public static void main(String[] args){
Builder builder = Builder.newBuilder();
builder.newSource(() -> 1).reduceByKeyAndWindow(
value -> value,
value -> value,
WindowConfig.CustomWindow(new RandomTriggerPolicy<>(.5f), new AllEvictionPolicy<>()),
(x, y) -> x + y
).log();
}
} |
Not sure what the build failure was. Have kicked the build again. |
@dancollins34 - can you add examples of how to use these under examples/ and storm-compatibility-examples? This will help developers give some code that they can understand and also quickly copy and modify if needed. |
* | ||
* @param windowManager the window manager | ||
*/ | ||
void setWindowManager(WindowManager<T> windowManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove this from the interface. Having this interface is confusing to users since WindowManager implements the TriggerHandler interface. WindowManager is also not a interface that users can implement thus it shouldn't be part of an interface that users will have to implement. I think having "setTriggerHandler" should suffice. For build in triggers (e.g. WatermarkCountTriggerPolicy, WatermarkTimeTriggerPolicy), we can just pass in the WindowManager via their constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reasoning behind this was that the WindowManager, TriggerHandler (which is the WindowManager) and TopologyConfig are three things the users may not have access to directly. I was aiming to provide access to the same structures used in the internal TriggerPolicies to custom TriggerPolicies, should users wish to use them. The idea was that, if someone wanted to, it should be possible to use the existing trigger policies in this manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more closely at the code I am confused how the a custom trigger policy will work. All the existing trigger policies require the WindowManager to actually trigger windows. How will a user implementing a custom TriggerPolicy do this. They have no reference to the WindowManager. Shouldn't the windowManager be passed into the custom trigger implicitly? As of right now, trigger polices can't trigger windows without the WindowManager. That is why I suggested removing the setWindowManager interface, since trigger policies need the window manager regardless of what kind of trigger policy it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look at the changes i’ve made to windowedBoltExecutor. Essentially, in order to allow these to attach to arbitrary triggerPolicies, Ive replaced their inclusion in the constructor with the setters you see in the interface for all trigger policies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
import com.twitter.heron.api.windowing.WindowManager; | ||
|
||
|
||
public abstract class AbstractBaseTriggerPolicy<T extends Serializable, S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this interface is necessary, if we remove WindowManager from the TriggerPolicy interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless, you would still need a way to get the TriggerHandler (WindowPolicy) attached to the custom TriggerPolicy that was passed to the config. AbstractBaseTriggerPolicy was just made as a way to standardize how these attachments were handled for the internal classes, and provide a way to use those same attachment variables in custom classes.
@dancollins34 thanks for your work! Generally looks good. I just have couple of comments for you. |
Will address @kramasamy comments and add to /examples following resolution of concerns raised by @jerrypeng |
@dancollins34 - it will be nice if you could add some documentation on how to use this in a separate PR. |
@dancollins34 thanks for replying to my comments. I had a question regarding the example you provided above for a custom trigger. The TriggerHandler is never set but you call the handler in the method track: "this.handler.onTrigger();" How is that going to work? or am I missing something? |
Yes it is- in the windowedBoltExecutor. Take a look at those changes. |
/** | ||
* Set the requirements in the constructor | ||
*/ | ||
public AbstractBaseTriggerPolicy(boolean requiresEvictionPolicy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need these checks. I think it should be up to the user to make sure to pass in the correct objects he or she needs for a custom trigger. Having to pass these booleans into the constructor also makes the interface confusing to some reading to code. If you really want these checks in place, I would suggest using setters to set these. Then it is clear to the reader of the code what is being set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng These checks were in place to keep the existing trigger policies as restrictive as they were before wrt which of the objects (evictionPolicy, windowManager) were attached. I could provide an alternate no-arg constructor that sets them all to false, then use setters instead? However, since this is the abstract class which already partially implements the TriggerPolicy interface, I would think it would be okay to perform some checks, but I have no issue with removing them either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets remove them to keep the interface as simple and clean as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng will do
@dancollins34 thanks for the clarification. I am +1 on this PR |
@jerrypeng - Can I merge this? |
@kramasamy we can once the last comment is addressed |
+1 @kramasamy we can merge |
* Add support for custom evictor and trigger policies Add support in both the streamlet and topology apis for the usage of custom eviction policies extending com.twitter.heron.api.windowing.EvictionPolicy and trigger policies extending com.twitter.heron.api.windowing.TriggerPolicy to enable user-defined windowing schemes. * Fixed Checkstyle Issues Fixed Checkstyle Issues * Fixed extra line errors, reorganized if statement in WIndowedBoltExecutor to require both Policies to be provided if one of them is * Reversed policy order to fix null pointer exception * Trial push (local compilation not working for some reason, going to push then pull clean copy) * Added Changes necessary to allow custom trigger policies to attach WindowManager, Config object, TriggerHandler and EvictionPolicy * Fixed test errors due to lack of setting necessary paramters and defining "started" again for CountTriggerPolicy * Remove flags and checks on AbstractBaseTriggerPolicy
Hello,
Pursuant to my question in #2647 , I've added support for custom eviction and trigger policies. The modifications that I have made is to:
-Add the ability to add a custom EvictionPolicy and TriggerPolicy to a WindowingConfigs
-Add an if statement to check for the existence of custom policies in the WindowingConfigs passed to a WindowedBoltExecutor
-Add the ability to add a custom trigger or evictor policy to a BaseWindowedBolt
-Add support for a custom trigger and evictor policy WindowConfig and WindowConfigImpl in the streamlet api
Thoughts? This is my first time interacting with the underlying api, but I tried to keep the methods/implementation as similar as possible to what already existed.
Daniel
Add support in both the streamlet and topology apis for the usage of custom eviction policies extending com.twitter.heron.api.windowing.EvictionPolicy and trigger policies extending com.twitter.heron.api.windowing.TriggerPolicy to enable user-defined windowing schemes.