From d6bc5242bb63226231160b3404a833c272300003 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 08:54:36 +0000 Subject: [PATCH 1/4] Initial plan From f1f222217e545fafb8eeb98a4a006ce4a97da5d5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 27 Nov 2025 09:04:02 +0000 Subject: [PATCH 2/4] Refactor SyncStreamQueueSource to use ScheduledExecutorService instead of Thread.sleep Co-authored-by: aepfli <9987394+aepfli@users.noreply.github.com> --- .../connector/sync/SyncStreamQueueSource.java | 75 +++++++++++-------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index a3b01f913..3c072bfd7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -21,7 +21,9 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -49,6 +51,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; + private final ScheduledExecutorService scheduler; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -65,6 +68,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer Date: Thu, 27 Nov 2025 09:08:03 +0000 Subject: [PATCH 3/4] Address code review feedback for ScheduledExecutorService refactoring Co-authored-by: aepfli <9987394+aepfli@users.noreply.github.com> --- .../connector/sync/SyncStreamQueueSource.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 3c072bfd7..6eef04b00 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,7 +69,11 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer { + Thread t = new Thread(r, "flagd-sync-retry-scheduler"); + t.setDaemon(true); + return t; + }); } // internal use only @@ -86,7 +91,11 @@ protected SyncStreamQueueSource( flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); metadataStub = blockingStubMock; - scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "flagd-sync-retry-scheduler"); + t.setDaemon(true); + return t; + }); } /** Initialize sync stream connector. */ @@ -115,7 +124,12 @@ public void shutdown() throws InterruptedException { return; } this.scheduler.shutdownNow(); - this.scheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS); + try { + this.scheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.debug("Scheduler termination was interrupted", e); + Thread.currentThread().interrupt(); + } this.channelConnector.shutdown(); } @@ -167,7 +181,12 @@ private void scheduleRetry() { log.info("Shutdown invoked, exiting event stream listener"); return; } - scheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); + try { + scheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // Scheduler was shut down after the shutdown check, which is fine + log.debug("Retry scheduling rejected, scheduler is shut down", e); + } } // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584 From 4704a0da96d25ad784fdaa94054e074eeec7f416 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Thu, 27 Nov 2025 10:52:40 +0100 Subject: [PATCH 4/4] fixup: gemini suggestion Signed-off-by: Simon Schrottner --- .../connector/sync/SyncStreamQueueSource.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 6eef04b00..a8ed3ae86 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -69,11 +69,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer { - Thread t = new Thread(r, "flagd-sync-retry-scheduler"); - t.setDaemon(true); - return t; - }); + scheduler = createScheduler(); } // internal use only @@ -91,7 +87,11 @@ protected SyncStreamQueueSource( flagSyncStub = stubMock; syncMetadataDisabled = options.isSyncMetadataDisabled(); metadataStub = blockingStubMock; - scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + scheduler = createScheduler(); + } + + private static ScheduledExecutorService createScheduler() { + return Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "flagd-sync-retry-scheduler"); t.setDaemon(true); return t; @@ -178,7 +178,7 @@ private void observeSyncStream() { */ private void scheduleRetry() { if (shutdown.get()) { - log.info("Shutdown invoked, exiting event stream listener"); + log.debug("Shutdown in progress, not scheduling retry."); return; } try {