Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource {
private static final int QUEUE_SIZE = 5;

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
private final int streamDeadline;
private final int deadline;
private final int maxBackoffMs;
Expand Down Expand Up @@ -102,7 +103,10 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
* @throws InterruptedException if stream can't be closed within deadline.
*/
public void shutdown() throws InterruptedException {
if (shutdown.getAndSet(true)) {
// Use atomic compareAndSet to ensure shutdown is only executed once
// This prevents race conditions when shutdown is called from multiple threads
if (!shutdown.compareAndSet(false, true)) {
log.debug("Shutdown already in progress or completed");
return;
}
this.channelConnector.shutdown();
Expand All @@ -117,16 +121,26 @@ private void observeSyncStream() {
// error conditions
while (!shutdown.get()) {
try {
if (shouldThrottle.getAndSet(false)) {
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
Thread.sleep(this.maxBackoffMs);

// Check shutdown again after sleep to avoid unnecessary work
if (shutdown.get()) {
break;
}
}

log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
Thread.sleep(this.maxBackoffMs);
shouldThrottle.set(true);
continue;
}

Expand All @@ -135,7 +149,7 @@ private void observeSyncStream() {
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
Thread.sleep(this.maxBackoffMs);
shouldThrottle.set(true);
}
} catch (InterruptedException ie) {
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
Expand Down Expand Up @@ -209,12 +223,14 @@ private static void enqueueError(BlockingQueue<QueuePayload> queue, String messa

private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
private final BlockingQueue<QueuePayload> outgoingQueue;
private final AtomicBoolean shouldThrottle;
private final Awaitable done = new Awaitable();

private Struct metadata;

public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue) {
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle) {
this.outgoingQueue = outgoingQueue;
this.shouldThrottle = shouldThrottle;
}

@Override
Expand All @@ -235,6 +251,9 @@ public void onError(Throwable throwable) {
String message = throwable != null ? throwable.getMessage() : "unknown";
log.debug("Stream error: {}, will restart", message, throwable);
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));

// Set throttling flag to ensure backoff before retry
this.shouldThrottle.set(true);
} finally {
done.wakeup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class SyncStreamQueueSourceTest {
private ChannelConnector mockConnector;
private FlagSyncServiceBlockingStub blockingStub;
private FlagSyncServiceStub stub;
private FlagSyncServiceStub errorStub;
private FlagSyncServiceStub syncErrorStub;
private FlagSyncServiceStub asyncErrorStub;
private StreamObserver<SyncFlagsResponse> observer;
private CountDownLatch latch; // used to wait for observer to be initialized

Expand All @@ -60,25 +61,76 @@ public void setup() throws Exception {
.when(stub)
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize

errorStub = mock(FlagSyncServiceStub.class);
when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub);
syncErrorStub = mock(FlagSyncServiceStub.class);
when(syncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(syncErrorStub);
doAnswer((Answer<Void>) invocation -> {
Object[] args = invocation.getArguments();
observer = (StreamObserver<SyncFlagsResponse>) args[1];
latch.countDown();
throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND);
})
.when(errorStub)
.when(syncErrorStub)
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize

asyncErrorStub = mock(FlagSyncServiceStub.class);
when(asyncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(asyncErrorStub);
doAnswer((Answer<Void>) invocation -> {
Object[] args = invocation.getArguments();
observer = (StreamObserver<SyncFlagsResponse>) args[1];
latch.countDown();

// Start a thread to call onError after a short delay
new Thread(() -> {
try {
Thread.sleep(10); // Wait 100ms before calling onError
observer.onError(new StatusRuntimeException(io.grpc.Status.INTERNAL));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.start();

return null;
})
.when(asyncErrorStub)
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
}

@Test
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Git is rendering this oddly; this is the previous unchanged test (other than the name). The new one is below.

void syncInitError_DoesNotBusyWait() throws Exception {
// make sure we do not spin in a busy loop on immediately errors

int maxBackoffMs = 1000;
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
mockConnector,
syncErrorStub,
blockingStub);
latch = new CountDownLatch(1);
queueSource.init();
latch.await();

BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertEquals(QueuePayloadType.ERROR, payload.getType());
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties

// should have retried the stream (2 calls); initial + 1 retry
// it's very important that the retry count is low, to confirm no busy-loop
verify(syncErrorStub, times(2)).syncFlags(any(), any());
}

@Test
void initError_DoesNotBusyWait() throws Exception {
// make sure we do not spin in a busy loop on errors
void asyncInitError_DoesNotBusyWait() throws Exception {
Copy link
Member Author

@toddbaert toddbaert Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test exercises the fix (the onError runs in the asyncErrorStub).

// make sure we do not spin in a busy loop on async errors

int maxBackoffMs = 1000;
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), mockConnector, errorStub, blockingStub);
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
mockConnector,
asyncErrorStub,
blockingStub);
latch = new CountDownLatch(1);
queueSource.init();
latch.await();
Expand All @@ -91,7 +143,7 @@ void initError_DoesNotBusyWait() throws Exception {

// should have retried the stream (2 calls); initial + 1 retry
// it's very important that the retry count is low, to confirm no busy-loop
verify(errorStub, times(2)).syncFlags(any(), any());
verify(asyncErrorStub, times(2)).syncFlags(any(), any());
}

@Test
Expand Down