- 
                Notifications
    
You must be signed in to change notification settings  - Fork 58
 
fix(flagd): improve stream observer, refine retry policy; don't use retry to avoid busy loop #1590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(flagd): improve stream observer, refine retry policy; don't use retry to avoid busy loop #1590
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @guidobrei, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly refactors the FlagSyncService to enhance the stability and correctness of stream observation. The primary goal is to prevent a potential deadlock scenario and improve the synchronization of stream cancellation. By removing a blocking queue and implementing a unified cancellation context, the service now handles metadata retrieval and flag synchronization more robustly, ensuring that stream failures or metadata issues lead to proper stream restarts.
Highlights
- Deadlock Prevention: Eliminated a potential deadlock in FlagSyncService by removing the incomingQueue and its blocking take() operation.
 - Stream Synchronization: Resolved synchronization issues between getMetadata and syncFlags by sharing a single Context.CancellableContext, ensuring consistent cancellation behavior.
 - Refactored Stream Observation: Introduced a dedicated SyncStreamObserver class to manage gRPC stream events directly, simplifying the stream handling logic.
 
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.
| Feature | Command | Description | 
|---|---|---|
| Code Review | /gemini review | 
Performs a code review for the current pull request in its current state. | 
| Pull Request Summary | /gemini summary | 
Provides a summary of the current pull request in its current state. | 
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in pull request comments and review comments. | 
| Help | /gemini help | 
Displays a list of available commands. | 
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
- 
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a potential deadlock in FlagSyncService by removing the blocking incomingQueue.take() call and refactors the stream handling logic to be more robust. The fix for the synchronization issue between getMetadata and syncFlags by sharing a CancellableContext is also a good improvement. My review includes a few suggestions to further improve the new implementation, mainly around error handling and thread interruption best practices. Specifically, I've pointed out an opportunity to restore the error message in the error payload, adjust a logging level, and properly handle InterruptedException.
        
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | 
           /gemini summary  | 
    
| 
           Hello guidobrei, Thank you for submitting this pull request and for requesting a review. I've analyzed the changes and here's a summary and a detailed review. Pull Request SummaryThis pull request, titled "fix(flagd): Fix stream observation in FlagSyncService", addresses two critical issues within the  Detailed ReviewI've reviewed the changes based on correctness, efficiency, and maintainability, adhering to the Google Java Style Guide for any stylistic suggestions. Overall FeedbackThis is a well-structured and impactful change. The refactoring effectively addresses the identified issues, leading to a more robust and maintainable stream observation mechanism. The separation of concerns into  Review CommentsFile:  
 File:  
 ConclusionThe changes in this pull request are well-thought-out and effectively address the described problems. The refactoring improves the code's correctness, efficiency, and maintainability. I recommend merging this PR once any minor suggestions are addressed.  | 
    
        
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
          
            Show resolved
            Hide resolved
        
              
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | 
           This looks like a good simplification. I will thoroughly review on Monday 🙏  | 
    
| localStub.syncFlags(syncRequest.build(), new QueueingStreamObserver<SyncFlagsResponse>(incomingQueue)); | ||
| localStub.syncFlags(syncRequest.build(), streamObserver); | ||
| 
               | 
          ||
| streamObserver.done.await(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also have to wakeup done when the shutdown method is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because when we shutdown, we call shutdownNow on the underlying channel - when this happens, the onError handler runs immediately and wakes it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, that was the intention.
| 
           @guidobrei I pushed a spotless fix. Doing some manual testing now.  | 
    
        
          
                ...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | 
           @guidobrei There's some e2e tests around context enrichment that are consistently failing since your change: Other than this, things look great. I did some manual testing as well and reconnect, events, etc works fine! EDIT: I think I found the reason here.  | 
    
| enqueueError(String.format("Error in getMetadata request: %s", message)); | ||
| continue; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
99% sure this is causing our test failure. @aepfli introduced this option in flagd to help us test the migration from the old getMetadata call to the new syncContext payload - in some e2e tests, flagd runs with the old getMetadata disabled... which is causing an error here immediately, even though we have the new syncContext.
We need to "save" this exception state, and only throw if we DON'T get the new alternative syncContext.
Does that make sense?
@aepfli please verify what I said 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either this or we check for gRPC Status code 12 UNIMPLEMENTED, or 5 NOT_FOUND.
Relying on the status code would be the easier implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the idea with checking the error code is also a good idea based on https://github.com/open-feature/flagd/blob/fcd19b310b319c9837b41c19d6f082ac750cb81d/flagd/pkg/service/flag-sync/handler.go#L111 it is unimplemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get UNIMPLEMENTED for other cases, like missing implementations - so I don't think that will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't this what we want to have? If the implementation is missing or disabled, we use whatever we get via the syncFlags() stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an example of how this could look like in this commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think you're right! lol sorry, I confused myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guidobrei The test is still failing, but if I make the catch block more general, it works. I think either the status code can't be checked the way you're checking it, or the exception is not of the expected type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Today I learned: 0d6c840
ee843fc    to
    f378e87      
    Compare
  
    | @SuppressWarnings({"unchecked", "rawtypes"}) | ||
| private static final Map<String, ?> DEFAULT_RETRY_POLICY = new HashMap() { | ||
| { | ||
| // 1 + 2 + 4 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does that comment mean? The number 7 does not show up in the values below
| )); | ||
| put("retryPolicy", new HashMap(DEFAULT_RETRY_POLICY) { | ||
| { | ||
| // 1 + 2 + 4 + 5 + 5 + 5 + 5 + 5 + 5 + 5 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does that comment mean? The result does not show up in the values below
| * deadline,and definitionally should not result in a tight loop | ||
| * (it's a timeout). | ||
| */ | ||
| Code.CANCELLED.toString(), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have here so many more status codes than in the other policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the GRPC retry policy in sync flags to not end up in a tight reconnect loop and try to reconnect instantly.
          
 It must be a result of the retry policy changes, right? Should we just avoid changes there for now? I'm not super happy with it, but I think it's distinct from the deadlock bug you've found.  | 
    
          
 If we don't change the retry policy, can't check for "UNIMPLEMENTED", because we would retry and in the end fail with a DEADLINE_EXCEEDED.  | 
    
… added Stream cancellation Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Co-authored-by: Todd Baert <todd.baert@dynatrace.com> Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
5a1ded7    to
    95914e6      
    Compare
  
    | */ | ||
| @Builder.Default | ||
| private int retryBackoffMaxMs = | ||
| fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're mixing here ms and s.
Is static final int DEFAULT_MAX_RETRY_BACKOFF = 5; a time unit or count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a time. I will make the default name more clear. It should also be 12000 as per https://flagd.dev/providers/rust/?h=flagd_retry_backoff_max_ms#configuration-options... nice catch.... this was leftover from my initial version before I realized we had an unimplemented var for this.
        
          
                providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
132801f    to
    455b362      
    Compare
  
    | String message = metaEx.getMessage(); | ||
| log.debug("Metadata request error: {}, will restart", message, metaEx); | ||
| enqueueError(String.format("Error in getMetadata request: %s", message)); | ||
| Thread.sleep(this.maxBackoffMs); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sleep helps prevent tight loops during retries, which can be a problem if a upstream connection in a proxy is down.
This PR
Related Issues
Fixes #1583
Notes
We've identified two related issues in current implementation:
incomingQueue.take()some message put in the blocking queue.getMetadatafails we want to cancel the stream. However, this never worked properly because the two parts listened on different cancellation tokens.The new implementation eliminates problem 1) by removing
incomingQueueat all and mitigates 2) by sharing a cancellation token.Follow-up Tasks
Still open is the removal of deprecated getMetadata open-feature/flagd#1584.
How to test
Clagd testbed covers FlagSyncService