From 958cd38566ebc63a36f7b33bb51a1c2c4692f57c Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Fri, 14 Nov 2025 13:31:34 -0500 Subject: [PATCH] fix: possible tight busy loop on certain connection errors Signed-off-by: Todd Baert --- .../connector/sync/SyncStreamQueueSource.java | 29 ++++++-- .../sync/SyncStreamQueueSourceTest.java | 68 ++++++++++++++++--- 2 files changed, 84 insertions(+), 13 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index d68adaeae..196ab77a6 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -38,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource { private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); private final int streamDeadline; private final int deadline; private final int maxBackoffMs; @@ -102,7 +103,10 @@ public BlockingQueue getStreamQueue() { * @throws InterruptedException if stream can't be closed within deadline. */ public void shutdown() throws InterruptedException { - if (shutdown.getAndSet(true)) { + // Use atomic compareAndSet to ensure shutdown is only executed once + // This prevents race conditions when shutdown is called from multiple threads + if (!shutdown.compareAndSet(false, true)) { + log.debug("Shutdown already in progress or completed"); return; } this.channelConnector.shutdown(); @@ -117,8 +121,18 @@ private void observeSyncStream() { // error conditions while (!shutdown.get()) { try { + if (shouldThrottle.getAndSet(false)) { + log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); + Thread.sleep(this.maxBackoffMs); + + // Check shutdown again after sleep to avoid unnecessary work + if (shutdown.get()) { + break; + } + } + log.debug("Initializing sync stream request"); - SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); try { observer.metadata = getMetadata(); } catch (Exception metaEx) { @@ -126,7 +140,7 @@ private void observeSyncStream() { String message = metaEx.getMessage(); log.debug("Metadata request error: {}, will restart", message, metaEx); enqueueError(String.format("Error in getMetadata request: %s", message)); - Thread.sleep(this.maxBackoffMs); + shouldThrottle.set(true); continue; } @@ -135,7 +149,7 @@ private void observeSyncStream() { } catch (Exception ex) { log.error("Unexpected sync stream exception, will restart.", ex); enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); - Thread.sleep(this.maxBackoffMs); + shouldThrottle.set(true); } } catch (InterruptedException ie) { log.debug("Stream loop interrupted, most likely shutdown was invoked", ie); @@ -209,12 +223,14 @@ private static void enqueueError(BlockingQueue queue, String messa private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; + private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); private Struct metadata; - public SyncStreamObserver(BlockingQueue outgoingQueue) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle) { this.outgoingQueue = outgoingQueue; + this.shouldThrottle = shouldThrottle; } @Override @@ -235,6 +251,9 @@ public void onError(Throwable throwable) { String message = throwable != null ? throwable.getMessage() : "unknown"; log.debug("Stream error: {}, will restart", message, throwable); enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + + // Set throttling flag to ensure backoff before retry + this.shouldThrottle.set(true); } finally { done.wakeup(); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index 02098b1cb..ef632a829 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -36,7 +36,8 @@ class SyncStreamQueueSourceTest { private ChannelConnector mockConnector; private FlagSyncServiceBlockingStub blockingStub; private FlagSyncServiceStub stub; - private FlagSyncServiceStub errorStub; + private FlagSyncServiceStub syncErrorStub; + private FlagSyncServiceStub asyncErrorStub; private StreamObserver observer; private CountDownLatch latch; // used to wait for observer to be initialized @@ -60,25 +61,76 @@ public void setup() throws Exception { .when(stub) .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize - errorStub = mock(FlagSyncServiceStub.class); - when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub); + syncErrorStub = mock(FlagSyncServiceStub.class); + when(syncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(syncErrorStub); doAnswer((Answer) invocation -> { Object[] args = invocation.getArguments(); observer = (StreamObserver) args[1]; latch.countDown(); throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND); }) - .when(errorStub) + .when(syncErrorStub) .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + + asyncErrorStub = mock(FlagSyncServiceStub.class); + when(asyncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(asyncErrorStub); + doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + observer = (StreamObserver) args[1]; + latch.countDown(); + + // Start a thread to call onError after a short delay + new Thread(() -> { + try { + Thread.sleep(10); // Wait 100ms before calling onError + observer.onError(new StatusRuntimeException(io.grpc.Status.INTERNAL)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .start(); + + return null; + }) + .when(asyncErrorStub) + .syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize + } + + @Test + void syncInitError_DoesNotBusyWait() throws Exception { + // make sure we do not spin in a busy loop on immediately errors + + int maxBackoffMs = 1000; + SyncStreamQueueSource queueSource = new SyncStreamQueueSource( + FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), + mockConnector, + syncErrorStub, + blockingStub); + latch = new CountDownLatch(1); + queueSource.init(); + latch.await(); + + BlockingQueue streamQueue = queueSource.getStreamQueue(); + QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); + assertNotNull(payload); + assertEquals(QueuePayloadType.ERROR, payload.getType()); + Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties + + // should have retried the stream (2 calls); initial + 1 retry + // it's very important that the retry count is low, to confirm no busy-loop + verify(syncErrorStub, times(2)).syncFlags(any(), any()); } @Test - void initError_DoesNotBusyWait() throws Exception { - // make sure we do not spin in a busy loop on errors + void asyncInitError_DoesNotBusyWait() throws Exception { + // make sure we do not spin in a busy loop on async errors int maxBackoffMs = 1000; SyncStreamQueueSource queueSource = new SyncStreamQueueSource( - FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), mockConnector, errorStub, blockingStub); + FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), + mockConnector, + asyncErrorStub, + blockingStub); latch = new CountDownLatch(1); queueSource.init(); latch.await(); @@ -91,7 +143,7 @@ void initError_DoesNotBusyWait() throws Exception { // should have retried the stream (2 calls); initial + 1 retry // it's very important that the retry count is low, to confirm no busy-loop - verify(errorStub, times(2)).syncFlags(any(), any()); + verify(asyncErrorStub, times(2)).syncFlags(any(), any()); } @Test