Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand Down Expand Up @@ -49,6 +52,7 @@ public class SyncStreamQueueSource implements QueueSource {
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final FlagSyncServiceStub flagSyncStub;
private final FlagSyncServiceBlockingStub metadataStub;
private final ScheduledExecutorService scheduler;

/**
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
Expand All @@ -65,6 +69,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
.withWaitForReady();
scheduler = createScheduler();
}

// internal use only
Expand All @@ -82,6 +87,15 @@ protected SyncStreamQueueSource(
flagSyncStub = stubMock;
syncMetadataDisabled = options.isSyncMetadataDisabled();
metadataStub = blockingStubMock;
scheduler = createScheduler();
}

private static ScheduledExecutorService createScheduler() {
return Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "flagd-sync-retry-scheduler");
t.setDaemon(true);
return t;
});
}

/** Initialize sync stream connector. */
Expand Down Expand Up @@ -109,6 +123,13 @@ public void shutdown() throws InterruptedException {
log.debug("Shutdown already in progress or completed");
return;
}
this.scheduler.shutdownNow();
try {
this.scheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.debug("Scheduler termination was interrupted", e);
Thread.currentThread().interrupt();
}
this.channelConnector.shutdown();
}

Expand All @@ -120,45 +141,54 @@ private void observeSyncStream() {
// "waitForReady" on the channel, plus our retry policy slow this loop down in
// error conditions
while (!shutdown.get()) {
if (shouldThrottle.getAndSet(false)) {
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
scheduleRetry();
return;
}

log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
shouldThrottle.set(true);
continue;
}

try {
if (shouldThrottle.getAndSet(false)) {
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
Thread.sleep(this.maxBackoffMs);

// Check shutdown again after sleep to avoid unnecessary work
if (shutdown.get()) {
break;
}
}

log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
shouldThrottle.set(true);
continue;
}

try {
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
shouldThrottle.set(true);
}
} catch (InterruptedException ie) {
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
shouldThrottle.set(true);
}
}

log.info("Shutdown invoked, exiting event stream listener");
}

/**
* Schedules a retry of the sync stream after the backoff period.
* Uses a non-blocking approach instead of Thread.sleep().
*/
private void scheduleRetry() {
if (shutdown.get()) {
log.debug("Shutdown in progress, not scheduling retry.");
return;
}
try {
scheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// Scheduler was shut down after the shutdown check, which is fine
log.debug("Retry scheduling rejected, scheduler is shut down", e);
}
}

// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
private Struct getMetadata() {
if (syncMetadataDisabled) {
Expand Down
Loading