-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Tail Sampler action to aggregate processor #2497
Conversation
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Codecov Report
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more @@ Coverage Diff @@
## main #2497 +/- ##
============================================
+ Coverage 93.39% 93.46% +0.06%
- Complexity 2122 2167 +45
============================================
Files 251 257 +6
Lines 5923 6059 +136
Branches 480 488 +8
============================================
+ Hits 5532 5663 +131
- Misses 264 267 +3
- Partials 127 129 +2 see 19 files with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
|
||
@Override | ||
public boolean shouldCarryState() { | ||
return shouldCarryGroupState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could result in thread issues.
If you have multiple groups at the same time, then the value of this will change between calls to concludeGroup
. So this value may not match the expected value.
I think a better approach would be to change the return type of concludeGroup
to a new class - AggregationActionOutput
.
public class AggregationActionOutput {
private List<Event> events;
private boolean shouldCarryState;
}
I'm not quite sure we do need to carry this state though. See my other comment.
@Override | ||
public List<Event> concludeGroup(final AggregateActionInput aggregateActionInput) { | ||
GroupState groupState = aggregateActionInput.getGroupState(); | ||
Duration timeDiff = Duration.between((Instant)groupState.get(LAST_RECEIVED_TIME_KEY), Instant.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I read this, the wait_time is deciding the time from the last received Event to now. Can we allow the AggregationAction to have a value which directs the AggegateProcessor to change the behavior of groupDuration
to be this instead? Would that solve the problem?
Right now, I'm having difficulty understanding how to relate these times as a user.
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
…ecks Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks!
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 } | ||
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 } | ||
``` | ||
The following Events (all) will be allowed, and no event is generated when the group is concluded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation has two examples that allow all Events. would be nice to have an example where the error condition is not met, as it is not clear to me what the outcome in that situation is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outcome is not deterministic because we use probabilistic sampling. That's why I used 100%.
final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock(); | ||
final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock(); | ||
|
||
Optional<Event> concludeGroupEvent = Optional.empty(); | ||
AggregateActionOutput actionOutput = new AggregateActionOutput(List.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Collections.emptyList here instead of List.of()
Lock getHandleEventForGroupLock() { | ||
return handleEventForGroupLock; | ||
} | ||
|
||
boolean shouldConcludeGroup(final Duration groupDuration) { | ||
if (customShouldConclude != null) { | ||
return customShouldConclude.apply(groupDuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary for this PR, but this is a nice feature that you've added which could be used at the highest level of the aggregate processor. Perhaps we could have a parameter of conclude_when
, which takes a conditional expression, and will automatically conclude groups when this condition is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about this, aggregate processor with duration
has an implicit meaning that conclusion should happen at the end of the duration. Not sure if conclude_when
makes sense when duration
is there. May be we should rename duration
to conclude_when
and accept duration as one of the possible value for conclude_when
. What do you think?
final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude); | ||
|
||
final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null; | ||
if (concludeGroupEvents != null && !concludeGroupEvents.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Would be nice if AggregateActionOutput
was never null, and always returned an object with at least an empty list of events to not have to do all these null checks
merging for now. Will address minor comments from Taylor in a different PR. |
Description
Add tail sampler action to aggregate processor
This processor keeps state, by collecting all events in a group, across different aggregation periods and takes action on events only after the aggregation group is idle for more than
waitPeriod
time. At this point, if the group has an error as defined byerror_condition
then all events collected are sent as concluding events. If there is no error, then a probabilistic sampling is done based on the configuredpercent
value.Example configuration for the processor -
Resolves #2572
Issues Resolved
#2572
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.