Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -591,21 +593,102 @@ private void maybeReportUnexpectedExhaustion(String message) {

/**
* Helper class to manage the lifecycle of conditions with automatic cleanup.
*
* <p>Before the aggregate completes, {@link #getFuture()} returns a
* <em>fresh</em> {@link CompletableFuture} per call. This matters because
* the run loop calls {@code CompletableFuture.anyOf(getFuture(),
* synchronizerNext)} on every iteration: if {@code getFuture()} returned
* the shared underlying aggregate while it was still pending, each
* {@code anyOf} call would permanently attach an {@code OrRelay}
* {@code Completion} to its {@code stack}. On a healthy primary
* synchronizer that streams ChangeSets without ever arming the fallback
* timer, the aggregate never completes, so those Completion nodes would
* accumulate monotonically for the synchronizer's full tenure -- a real
* memory leak proportional to event rate.
*
* <p>After the aggregate completes, {@link #getFuture()} returns the
* aggregate directly: any continuation registered on an already-completed
* CompletableFuture fires synchronously at registration time and is
* removed from the stack immediately by {@code cleanStack}, so the same
* accumulation cannot happen.
*
* <p>Fresh pre-completion futures are tracked in a {@link WeakHashMap}-backed
* set, so a fresh future whose only strong references were in the caller's
* loop iteration becomes garbage-collectable -- and automatically removed
* from {@code pending} -- once that iteration ends.
*
* <p>Package-private (rather than private) so that direct unit tests can
* exercise the API surface and assert per-call distinctness.
*/
private static class Conditions implements AutoCloseable {
static class Conditions implements AutoCloseable {
private final List<Condition> conditions;
private final CompletableFuture<Object> conditionsFuture;
private final CompletableFuture<Object> aggregate;
private final Object lock = new Object();

/**
* Tracks futures previously returned by {@link #getFuture()} that have
* not yet been completed. Held weakly via {@link WeakHashMap} so that
* fresh futures abandoned by the caller (the typical end-of-iteration
* case) become GC-collectable. Set to {@code null} once the aggregate
* has fired and the entries have been drained. Mutated only under
* {@code lock}.
*/
private Set<CompletableFuture<Object>> pending =
Collections.newSetFromMap(new WeakHashMap<>());

public Conditions(List<Condition> conditions) {
this.conditions = conditions;
this.conditionsFuture = conditions.isEmpty()
this.aggregate = conditions.isEmpty()
? new CompletableFuture<>() // Never completes if no conditions
: CompletableFuture.anyOf(
conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));

// Single permanent listener. This is the only Completion node ever
// attached to aggregate.stack while the aggregate is still pending
// -- subsequent pre-completion getFuture() calls do not touch the
// aggregate at all.
this.aggregate.whenComplete((result, throwable) -> {
List<CompletableFuture<Object>> snapshot;
synchronized (lock) {
if (pending == null) {
return;
}
// Copy under the lock: the ArrayList holds strong
// references so entries that survived GC to this point
// stay alive until we complete them below.
snapshot = new ArrayList<>(pending);
pending = null;
}
for (CompletableFuture<Object> cf : snapshot) {
if (throwable != null) {
cf.completeExceptionally(throwable);
} else {
cf.complete(result);
}
}
});
}

/**
* Returns a future that will complete when the underlying aggregate
* condition fires. Pre-completion, this is a fresh future per call;
* post-completion, this is the aggregate itself (already done).
*/
public CompletableFuture<Object> getFuture() {
return conditionsFuture;
if (aggregate.isDone()) {
return aggregate;
}

CompletableFuture<Object> fresh = new CompletableFuture<>();
synchronized (lock) {
if (pending == null) {
// Raced with aggregate completion between isDone() and
// the lock acquisition; aggregate is now done.
return aggregate;
}
pending.add(fresh);
}
return fresh;
}

public void inform(FDv2SourceResult result) {
Expand All @@ -615,6 +698,11 @@ public void inform(FDv2SourceResult result) {
@Override
public void close() {
conditions.forEach(Condition::close);
synchronized (lock) {
if (pending != null) {
pending.clear();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition;
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition.ConditionType;
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition;
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
* Direct tests for {@link FDv2DataSource.Conditions}.
*
* <p>The Conditions class is the aggregator that races fallback/recovery
* condition futures against synchronizer.next() in the FDv2DataSource run
* loop. Each iteration of that loop calls getFuture() and passes the result to
* CompletableFuture.anyOf(...) -- so getFuture() must not return a shared
* instance, or every anyOf call permanently attaches a Completion node to the
* shared instance's stack, leaking memory proportional to event rate during
* the synchronizer's tenure on a healthy primary.
*/
public class FDv2DataSourceConditionsAggregateTest {
private ScheduledExecutorService executor;

@Before
public void setUp() {
executor = Executors.newScheduledThreadPool(1);
}

@After
public void tearDown() {
executor.shutdownNow();
}

/**
* Bug-proving test: getFuture() must return a fresh instance per call.
*
* <p>If it returns the same instance (as it did before the fix), the run
* loop's per-iteration {@code anyOf(getFuture(), syncNext)} attaches a new
* OrRelay Completion to the shared future's stack every iteration, with no
* deregister path -- a monotonic leak for a non-firing aggregate.
*/
@Test
public void getFutureReturnsDistinctInstancesPerCall() {
Condition fallback = new FallbackCondition(executor, 60);
try (FDv2DataSource.Conditions conditions =
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
CompletableFuture<Object> f1 = conditions.getFuture();
CompletableFuture<Object> f2 = conditions.getFuture();
CompletableFuture<Object> f3 = conditions.getFuture();
assertThat(f1, not(sameInstance(f2)));
assertThat(f2, not(sameInstance(f3)));
assertThat(f1, not(sameInstance(f3)));
}
}

/**
* Even with no underlying conditions (a single-synchronizer configuration),
* getFuture() must return fresh instances. The aggregate never completes
* in this case, which is exactly the scenario where any per-iteration
* accumulation would be most damaging.
*/
@Test
public void getFutureReturnsDistinctInstancesEvenWithNoConditions() {
try (FDv2DataSource.Conditions conditions =
new FDv2DataSource.Conditions(Collections.emptyList())) {
CompletableFuture<Object> f1 = conditions.getFuture();
CompletableFuture<Object> f2 = conditions.getFuture();
assertThat(f1, not(sameInstance(f2)));
}
}

/**
* Every fresh future returned by getFuture() must complete when the
* underlying aggregate fires. The fan-out via the single permanent listener
* is what makes the fresh-per-call pattern work; verify it actually
* delivers.
*/
@Test
public void allFreshFuturesCompleteWhenAggregateFires() throws Exception {
// 0-second timeout -> fires on first INTERRUPTED inform.
Condition fallback = new FallbackCondition(executor, 0);
try (FDv2DataSource.Conditions conditions =
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
CompletableFuture<Object> f1 = conditions.getFuture();
CompletableFuture<Object> f2 = conditions.getFuture();
CompletableFuture<Object> f3 = conditions.getFuture();

conditions.inform(makeInterruptedResult());

Object r1 = f1.get(2, TimeUnit.SECONDS);
Object r2 = f2.get(2, TimeUnit.SECONDS);
Object r3 = f3.get(2, TimeUnit.SECONDS);

assertNotNull(r1);
assertNotNull(r2);
assertNotNull(r3);
assertTrue(r1 instanceof Condition);
assertTrue(r2 instanceof Condition);
assertTrue(r3 instanceof Condition);
}
}

/**
* Bug-proving test for the null-sentinel issue caught by Cursor Bugbot:
* if the underlying aggregate completes <em>exceptionally</em>, every
* future returned by getFuture() -- both those handed out before the
* exception and those requested after -- must complete exceptionally too.
*
* <p>Prior to this fix, the "fired" state was tracked via a {@code null}
* sentinel on a {@code completedValue} field, which also stayed
* {@code null} on exceptional completion. A subsequent getFuture() call
* would then return {@code CompletableFuture.completedFuture(null)} --
* silently converting an exceptional completion into a normal
* {@code null} completion. The run loop's downstream
* {@code res.getClass().getName()} would then throw NPE.
*/
@Test
public void getFutureFailsExceptionallyWhenAggregateFailsExceptionally()
throws Exception {
ManualCondition manualCondition = new ManualCondition();
try (FDv2DataSource.Conditions conditions =
new FDv2DataSource.Conditions(Collections.singletonList(manualCondition))) {
// Future requested BEFORE the exceptional completion.
CompletableFuture<Object> before = conditions.getFuture();

RuntimeException boom = new RuntimeException("simulated condition failure");
manualCondition.future.completeExceptionally(boom);

// Future requested AFTER the exceptional completion (exercises the
// fast path through makeCompletedFuture). This is the case bugbot
// caught: pre-fix, it returned completedFuture(null).
CompletableFuture<Object> after = conditions.getFuture();

assertThrowsExecutionExceptionWithCause(before, boom);
assertThrowsExecutionExceptionWithCause(after, boom);
}
}

/**
* getFuture() called after the aggregate has already fired returns an
* already-completed future synchronously (the fast path).
*/
@Test
public void getFutureAfterAggregateFiresReturnsCompletedFuture() throws Exception {
// RecoveryCondition arms its timer in the constructor and fires after
// the configured timeout. With timeout=0 it fires near-immediately.
Condition recovery = new RecoveryCondition(executor, 0);
try (FDv2DataSource.Conditions conditions =
new FDv2DataSource.Conditions(Collections.singletonList(recovery))) {
// Drain a future to confirm the aggregate has fired.
conditions.getFuture().get(2, TimeUnit.SECONDS);

CompletableFuture<Object> postFire = conditions.getFuture();
assertTrue("post-fire getFuture() should be already complete", postFire.isDone());
assertNotNull(postFire.get(0, TimeUnit.SECONDS));
}
}

private static FDv2SourceResult makeInterruptedResult() {
return FDv2SourceResult.interrupted(
new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.NETWORK_ERROR,
0,
"simulated",
Instant.now()),
false);
}

/**
* Asserts that {@code future.get()} throws {@link ExecutionException}
* wrapping the expected cause. {@code CompletableFuture#get} surfaces
* exceptional completion as ExecutionException with the original
* exception as its cause.
*/
private static void assertThrowsExecutionExceptionWithCause(
CompletableFuture<Object> future,
Throwable expectedCause) throws Exception {
try {
future.get(2, TimeUnit.SECONDS);
throw new AssertionError("expected ExecutionException, got normal completion");
} catch (ExecutionException ee) {
if (ee.getCause() != expectedCause) {
throw new AssertionError(
"expected cause to be " + expectedCause + " but was " + ee.getCause(), ee);
}
}
}

/**
* Test-only Condition with an externally-controllable future. The
* existing FallbackCondition/RecoveryCondition only resolve normally
* (with {@code this}); to exercise the exceptional path through the
* aggregate's whenComplete listener we need a Condition we can fail
* directly.
*/
private static final class ManualCondition
implements Condition {
final CompletableFuture<Condition> future = new CompletableFuture<>();

@Override
public CompletableFuture<Condition> execute() {
return future;
}

@Override
public void inform(FDv2SourceResult sourceResult) {
// Manually controlled; no auto-trigger from inform.
}

@Override
public void close() {
// No timer to cancel.
}

@Override
public ConditionType getType() {
return ConditionType.FALLBACK;
}
}
}
Loading