Skip to content

Commit

Permalink
Tail Sampler action in Aggregate processor broken (#2761)
Browse files Browse the repository at this point in the history
* Tail Sampler action in Aggregate processor broken

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed comments. Changed config option errorCondition to condition

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed May 30, 2023
1 parent 6e00991 commit 269baea
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 30 deletions.
8 changes: 4 additions & 4 deletions data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```

### <a name="tail_sampler"></a>
* `tail_sampler`: The system processes incoming events and determines whether or not they should be allowed based on two criteria. The first criterion is based on whether or not an error condition is present. If any of the aggregated events meet this condition, then all events are allowed to be output. The second criterion is triggered when no error condition is specified or if it is false. In this case, only a subset of the events is allowed to pass through, determined by a probabilistic outcome based on the configured percent value. Since it is difficult to determine exactly when "tail sampling" should occur, the wait_period configuration parameter is used to determine when to conduct this sampling based on the idle time after the last received event. When this action is used, the aggregate `group_duration` is not relevant as the conclusion is based on the `wait_period` and not on the group duration.
* `tail_sampler`: The system processes incoming events and determines whether or not they should be allowed based on two criteria. The first criterion is based on whether or not an condition is present. If any of the aggregated events meet this condition, then all events are allowed to be output. The second criterion is triggered when no condition is specified or if it is false. In this case, only a subset of the events is allowed to pass through, determined by a probabilistic outcome based on the configured percent value. Since it is difficult to determine exactly when "tail sampling" should occur, the wait_period configuration parameter is used to determine when to conduct this sampling based on the idle time after the last received event. When this action is used, the aggregate `group_duration` is not relevant as the conclusion is based on the `wait_period` and not on the group duration.
* It supports the following config options
* `percent`: percent of events to be allowed during aggregation window
* `wait_period`: minimum idle time before tail sampling is triggered
* `error_condition`: optional condition to indicate the error case for tail sampling
* When the following three events arrive with `percent` is set to 33, and no error condition specified (or error condition evaluates to false)
* `condition`: optional condition, if present and evalutes to true for an event, then the event will be included in the sampling
* When the following three events arrive with `percent` is set to 33, and no condition specified (or condition evaluates to false)
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
Expand All @@ -207,7 +207,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
```
* When the following three events arrive with in one second and the `error_condition` is set to `/bytes > 3000`
* When the following three events arrive with in one second and the `condition` is set to `/bytes > 3000`
```json
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,34 @@

import java.util.List;
import java.util.ArrayList;
import java.util.Random;
import java.time.Duration;
import java.time.Instant;

/**
* An AggregateAction that combines multiple Events into a single Event. This action
*
* @since 2.1
* @since 2.3
*/
@DataPrepperPlugin(name = "tail_sampler", pluginType = AggregateAction.class, pluginConfigurationType = TailSamplerAggregateActionConfig.class)
public class TailSamplerAggregateAction implements AggregateAction {
static final String LAST_RECEIVED_TIME_KEY = "last_received_time";
static final String SHOULD_CONCLUDE_CHECK_SET_KEY = "should_conclude_check_set";
static final String EVENTS_KEY = "events";
static final String ERROR_STATUS_KEY = "error_status";
private final double percent;
private final int percent;
private final Duration waitPeriod;
private final ExpressionEvaluator expressionEvaluator;
private final String errorCondition;
private final String condition;
private boolean shouldCarryGroupState;
private Random random;

@DataPrepperPluginConstructor
public TailSamplerAggregateAction(final TailSamplerAggregateActionConfig tailSamplerAggregateActionConfig, final ExpressionEvaluator expressionEvaluator) {
percent = tailSamplerAggregateActionConfig.getPercent();
waitPeriod = tailSamplerAggregateActionConfig.getWaitPeriod();
errorCondition = tailSamplerAggregateActionConfig.getErrorCondition();
condition = tailSamplerAggregateActionConfig.getCondition();
this.random = new Random();
this.expressionEvaluator = expressionEvaluator;
shouldCarryGroupState = true;
}
Expand All @@ -60,7 +63,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
List<Event> events = (List)groupState.getOrDefault(EVENTS_KEY, new ArrayList<>());
events.add(event);
groupState.put(EVENTS_KEY, events);
if (errorCondition != null && !errorCondition.isEmpty() && expressionEvaluator.evaluateConditional(errorCondition, event)) {
if (condition != null && !condition.isEmpty() && expressionEvaluator.evaluateConditional(condition, event)) {
groupState.put(ERROR_STATUS_KEY, true);
}
groupState.put(LAST_RECEIVED_TIME_KEY, Instant.now());
Expand All @@ -70,7 +73,11 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
@Override
public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
GroupState groupState = aggregateActionInput.getGroupState();
return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of()));
int randomInt = random.nextInt(100);
if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) {
return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of()));
}
return new AggregateActionOutput(List.of());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,30 @@ public class TailSamplerAggregateActionConfig {

@JsonProperty("percent")
@NotNull
private double percent;
private Integer percent;

@JsonProperty("error_condition")
private String errorCondition;
@JsonProperty("condition")
private String condition;

@AssertTrue(message = "Percent value must be greater than 0.0 and less than 100.0")
boolean isPercentValid() {
return percent > 0.0 && percent < 100.0;
}

public double getPercent() {
public Integer getPercent() {
return percent;
}

@AssertTrue(message = "Wait period value must be greater than 0 and less than 600")
@AssertTrue(message = "Wait period value must be greater than 0 and less than 60")
boolean isWaitPeriodValid() {
return waitPeriod.getSeconds() > 0 && waitPeriod.getSeconds() <= 600;
return waitPeriod.getSeconds() > 0 && waitPeriod.getSeconds() <= 60;
}

public Duration getWaitPeriod() {
return waitPeriod;
}

public String getErrorCondition() {
return errorCondition;
public String getCondition() {
return condition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateActionConfig;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PercentSamplerAggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PercentSamplerAggregateActionConfig;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.TailSamplerAggregateActionConfig;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.TailSamplerAggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterMode;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterAggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterAggregateActionConfig;
Expand All @@ -54,20 +56,24 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.collection.IsIn.in;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.doAnswer;

/**
* These integration tests are executing concurrent code that is inherently difficult to test, and even more difficult to recreate a failed test.
Expand All @@ -81,11 +87,14 @@ public class AggregateProcessorIT {
private static final int NUM_UNIQUE_EVENTS_PER_BATCH = 8;
private static final int NUM_THREADS = 100;
private static final int GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE = 2;
private static final int ERROR_STATUS = 2;
@Mock
private AggregateProcessorConfig aggregateProcessorConfig;

@Mock
RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig;
@Mock
TailSamplerAggregateActionConfig tailSamplerAggregateActionConfig;

private AggregateAction aggregateAction;
private PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -502,6 +511,55 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch
}
}

@ParameterizedTest
@ValueSource(ints = {20, 40, 60})
void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedException, NoSuchFieldException, IllegalAccessException {
final Duration testWaitPeriod = Duration.ofSeconds(3);
final String testCondition = "/status == "+ERROR_STATUS;
when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(testPercent);
when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod);
when(tailSamplerAggregateActionConfig.getCondition()).thenReturn(testCondition);
doAnswer(a -> {
Event event = (Event)a.getArgument(1);
return event.get("status", Integer.class) == ERROR_STATUS;
}).when(expressionEvaluator).evaluateConditional(eq(testCondition), any(Event.class));
aggregateAction = new TailSamplerAggregateAction(tailSamplerAggregateActionConfig, expressionEvaluator);
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction);
when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE));
when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(List.of("traceId"));
final AggregateProcessor objectUnderTest = createObjectUnderTest();
final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);

final int numberOfErrorTraces = 2;
final int numberOfSpans = 5;
eventBatch = getBatchOfEventsForTailSampling(numberOfErrorTraces, numberOfSpans);
objectUnderTest.doExecute(eventBatch);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS);

for (int i = 0; i < NUM_THREADS; i++) {
executorService.execute(() -> {
final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(eventBatch);
countDownLatch.countDown();
});
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));
List<Event> errorEventList = eventBatch.stream().map(Record::getData).filter(event -> {
Event ev = ((Event)event);
return ev.get("status", Integer.class) == ERROR_STATUS;
}).collect(Collectors.toList());
Thread.sleep(testWaitPeriod.toMillis()*2);
Collection<Record<Event>> results = objectUnderTest.doExecute(new ArrayList<Record<Event>>());
Set<Event> resultsSet = results.stream().map(Record::getData).collect(Collectors.toSet());
assertThat(results.size(), greaterThanOrEqualTo(numberOfErrorTraces*numberOfSpans));
assertThat(results.size(), lessThan(eventBatch.size()*(NUM_THREADS+1)));
for (final Event event : errorEventList) {
assertTrue(resultsSet.contains(event));
}
}

private List<Record<Event>> getBatchOfEvents(boolean withSameValue) {
final List<Record<Event>> events = new ArrayList<>();

Expand Down Expand Up @@ -529,4 +587,25 @@ private Map<String, Object> getEventMap(int i) {
return eventMap;
}

private List<Record<Event>> getBatchOfEventsForTailSampling(final int numberOfErrorTraces, final int numberOfSpans) {
final List<Record<Event>> events = new ArrayList<>();
final int numberOfTraces = numberOfErrorTraces + 10;

for (int i = 0; i < numberOfTraces; i++) {
final int status = (i < numberOfErrorTraces) ? ERROR_STATUS : 0;
for (int j = 0; j < numberOfSpans; j++) {
final Map<String, Object> eventMap = new HashMap<>();
eventMap.put("traceId", 10000+i);
eventMap.put("spanId", j);
eventMap.put("status", status);
final Event event = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap)
.build();
events.add(new Record<>(event));
}
}
return events;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ void setup() {

@Test
void testValidConfig() throws NoSuchFieldException, IllegalAccessException {
final double testPercent = ThreadLocalRandom.current().nextDouble(0.01, 99.9);
final int testPercent = ThreadLocalRandom.current().nextInt(1, 99);
setField(TailSamplerAggregateActionConfig.class, tailSamplerAggregateActionConfig, "percent", testPercent);
assertThat(tailSamplerAggregateActionConfig.getPercent(), equalTo(testPercent));
}

@ParameterizedTest
@ValueSource(doubles = {0.0, 100.0, -1.0, 110.0})
void testInvalidConfig(double percent) throws NoSuchFieldException, IllegalAccessException {
@ValueSource(ints = {0, 100, -1, 110})
void testInvalidConfig(int percent) throws NoSuchFieldException, IllegalAccessException {
setField(TailSamplerAggregateActionConfig.class, tailSamplerAggregateActionConfig, "percent", percent);
assertThat(tailSamplerAggregateActionConfig.isPercentValid(), equalTo(false));
}
Expand All @@ -59,15 +59,15 @@ void testWaitPeriod() throws NoSuchFieldException, IllegalAccessException {

@Test
void testErrorConditionEmptyOrNull() throws NoSuchFieldException, IllegalAccessException {
assertThat(tailSamplerAggregateActionConfig.getErrorCondition(), equalTo(null));
setField(TailSamplerAggregateActionConfig.class, tailSamplerAggregateActionConfig, "errorCondition", "");
assertTrue(tailSamplerAggregateActionConfig.getErrorCondition().isEmpty());
assertThat(tailSamplerAggregateActionConfig.getCondition(), equalTo(null));
setField(TailSamplerAggregateActionConfig.class, tailSamplerAggregateActionConfig, "condition", "");
assertTrue(tailSamplerAggregateActionConfig.getCondition().isEmpty());
}

@Test
void testValidErrorCondition() throws NoSuchFieldException, IllegalAccessException {
final String testErrorCondition = RandomStringUtils.randomAlphabetic(20);
setField(TailSamplerAggregateActionConfig.class, tailSamplerAggregateActionConfig, "errorCondition", testErrorCondition);
assertThat(tailSamplerAggregateActionConfig.getErrorCondition(), equalTo(testErrorCondition));
setField(TailSamplerAggregateActionConfig.class, tailSamplerAggregateActionConfig, "condition", testErrorCondition);
assertThat(tailSamplerAggregateActionConfig.getCondition(), equalTo(testErrorCondition));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ private AggregateAction createObjectUnderTest(TailSamplerAggregateActionConfig c
@Test
void testTailSamplerAggregateBasic() throws InterruptedException {
final Duration testWaitPeriod = Duration.ofSeconds(3);
final double testPercent = 100;
final int testPercent = 100;
when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(testPercent);
when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod);
when(tailSamplerAggregateActionConfig.getErrorCondition()).thenReturn("");
when(tailSamplerAggregateActionConfig.getCondition()).thenReturn("");
tailSamplerAggregateAction = createObjectUnderTest(tailSamplerAggregateActionConfig);
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Expand Down Expand Up @@ -89,13 +89,13 @@ void testTailSamplerAggregateBasic() throws InterruptedException {
@Test
void testTailSamplerAggregateWithErrorCondition() throws InterruptedException {
final Duration testWaitPeriod = Duration.ofSeconds(3);
final double testPercent = 0;
final int testPercent = 0;
final String statusKey = "status";
final int errorStatusValue = 1;
final String errorCondition = "/"+statusKey+" == "+errorStatusValue;
when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(testPercent);
when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod);
when(tailSamplerAggregateActionConfig.getErrorCondition()).thenReturn(errorCondition);
when(tailSamplerAggregateActionConfig.getCondition()).thenReturn(errorCondition);
when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class))).thenReturn(true);
tailSamplerAggregateAction = createObjectUnderTest(tailSamplerAggregateActionConfig);
final String key = UUID.randomUUID().toString();
Expand Down

0 comments on commit 269baea

Please sign in to comment.