-
Couldn't load subscription status.
- Fork 2.5k
fix(pool): Pool ReAuth should not interfere with handoff #3547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+1,138
−143
Merged
Changes from all commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
5fe0bfa
fix(pool): wip, pool reauth should not interfere with handoff
ndyakov d39da69
fix credListeners map
ndyakov 8a629fb
fix race in tests
ndyakov 6c54ab5
Merge branch 'master' into ndyakov/pool-reauth
ndyakov 90bfdb3
better conn usable timeout
ndyakov 07283ec
add design decision comment
ndyakov 1bbf2e6
few small improvements
ndyakov 1428068
update marked as queued
ndyakov e7dc339
add Used to clarify the state of the conn
ndyakov 77c0c73
rename test
ndyakov 011ef96
fix(test): fix flaky test
ndyakov e03396e
lock inside the listeners collection
ndyakov 391b6c5
address pr comments
ndyakov 6ad9a67
Update internal/auth/cred_listeners.go
ndyakov 0c4f8fb
Update internal/pool/buffer_size_test.go
ndyakov acb55d8
wip refactor entraid
ndyakov d74671b
fix maintnotif pool hook
ndyakov 0e10cd7
fix mocks
ndyakov afba8c2
fix nil listener
ndyakov 4bc6d33
sync and async reauth based on conn lifecycle
ndyakov 3020e3a
be able to reject connection OnGet
ndyakov f886775
pass hooks so the tests can observe reauth
ndyakov 72cf74a
give some time for the background to execute commands
ndyakov c715185
fix tests
ndyakov f14095b
only async reauth
ndyakov e94cc9f
Merge branch 'master' into ndyakov/pool-reauth
ndyakov 19f4080
Update internal/pool/pool.go
ndyakov 4049d5e
Update internal/auth/streaming/pool_hook.go
ndyakov 494a89b
Merge branch 'master' into ndyakov/pool-reauth
ndyakov 3211b13
Update internal/pool/conn.go
ndyakov b11f928
chore(redisotel): use metric.WithAttributeSet to avoid copy (#3552)
boekkooi-impossiblecloud 8ca7979
chore(docs): explain why MaxRetries is disabled for ClusterClient (#3…
Pika-Gopher ce49979
exponential backoff
ndyakov 67daad7
address pr comments
ndyakov 528f2e9
Merge remote-tracking branch 'origin/master' into ndyakov/pool-reauth
ndyakov 1ee5293
address pr comments
ndyakov 44917ef
remove rlock
ndyakov f0fbea6
add some comments
ndyakov e469358
add comments
ndyakov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 100 additions & 0 deletions
100
internal/auth/streaming/conn_reauth_credentials_listener.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| package streaming | ||
|
|
||
| import ( | ||
| "github.com/redis/go-redis/v9/auth" | ||
| "github.com/redis/go-redis/v9/internal/pool" | ||
| ) | ||
|
|
||
| // ConnReAuthCredentialsListener is a credentials listener for a specific connection | ||
| // that triggers re-authentication when credentials change. | ||
| // | ||
| // This listener implements the auth.CredentialsListener interface and is subscribed | ||
| // to a StreamingCredentialsProvider. When new credentials are received via OnNext, | ||
| // it marks the connection for re-authentication through the manager. | ||
| // | ||
| // The re-authentication is always performed asynchronously to avoid blocking the | ||
| // credentials provider and to prevent potential deadlocks with the pool semaphore. | ||
| // The actual re-auth happens when the connection is returned to the pool in an idle state. | ||
| // | ||
| // Lifecycle: | ||
| // - Created during connection initialization via Manager.Listener() | ||
| // - Subscribed to the StreamingCredentialsProvider | ||
| // - Receives credential updates via OnNext() | ||
| // - Cleaned up when connection is removed from pool via Manager.RemoveListener() | ||
| type ConnReAuthCredentialsListener struct { | ||
| // reAuth is the function to re-authenticate the connection with new credentials | ||
| reAuth func(conn *pool.Conn, credentials auth.Credentials) error | ||
|
|
||
| // onErr is the function to call when re-authentication or acquisition fails | ||
| onErr func(conn *pool.Conn, err error) | ||
|
|
||
| // conn is the connection this listener is associated with | ||
| conn *pool.Conn | ||
|
|
||
| // manager is the streaming credentials manager for coordinating re-auth | ||
| manager *Manager | ||
| } | ||
|
|
||
| // OnNext is called when new credentials are received from the StreamingCredentialsProvider. | ||
| // | ||
| // This method marks the connection for asynchronous re-authentication. The actual | ||
| // re-authentication happens in the background when the connection is returned to the | ||
| // pool and is in an idle state. | ||
| // | ||
| // Asynchronous re-auth is used to: | ||
| // - Avoid blocking the credentials provider's notification goroutine | ||
| // - Prevent deadlocks with the pool's semaphore (especially with small pool sizes) | ||
| // - Ensure re-auth happens when the connection is safe to use (not processing commands) | ||
| // | ||
| // The reAuthFn callback receives: | ||
| // - nil if the connection was successfully acquired for re-auth | ||
| // - error if acquisition timed out or failed | ||
| // | ||
| // Thread-safe: Called by the credentials provider's notification goroutine. | ||
| func (c *ConnReAuthCredentialsListener) OnNext(credentials auth.Credentials) { | ||
| if c.conn == nil || c.conn.IsClosed() || c.manager == nil || c.reAuth == nil { | ||
| return | ||
| } | ||
|
|
||
| // Always use async reauth to avoid complex pool semaphore issues | ||
| // The synchronous path can cause deadlocks in the pool's semaphore mechanism | ||
| // when called from the Subscribe goroutine, especially with small pool sizes. | ||
| // The connection pool hook will re-authenticate the connection when it is | ||
| // returned to the pool in a clean, idle state. | ||
| c.manager.MarkForReAuth(c.conn, func(err error) { | ||
| // err is from connection acquisition (timeout, etc.) | ||
| if err != nil { | ||
| // Log the error | ||
| c.OnError(err) | ||
| return | ||
| } | ||
| // err is from reauth command execution | ||
| err = c.reAuth(c.conn, credentials) | ||
| if err != nil { | ||
| // Log the error | ||
| c.OnError(err) | ||
| return | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // OnError is called when an error occurs during credential streaming or re-authentication. | ||
| // | ||
| // This method can be called from: | ||
| // - The StreamingCredentialsProvider when there's an error in the credentials stream | ||
| // - The re-auth process when connection acquisition times out | ||
| // - The re-auth process when the AUTH command fails | ||
| // | ||
| // The error is delegated to the onErr callback provided during listener creation. | ||
| // | ||
| // Thread-safe: Can be called from multiple goroutines (provider, re-auth worker). | ||
| func (c *ConnReAuthCredentialsListener) OnError(err error) { | ||
| if c.onErr == nil { | ||
| return | ||
| } | ||
|
|
||
| c.onErr(c.conn, err) | ||
| } | ||
|
|
||
| // Ensure ConnReAuthCredentialsListener implements the CredentialsListener interface. | ||
| var _ auth.CredentialsListener = (*ConnReAuthCredentialsListener)(nil) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| package streaming | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "github.com/redis/go-redis/v9/auth" | ||
| ) | ||
|
|
||
| // CredentialsListeners is a thread-safe collection of credentials listeners | ||
| // indexed by connection ID. | ||
| // | ||
| // This collection is used by the Manager to maintain a registry of listeners | ||
| // for each connection in the pool. Listeners are reused when connections are | ||
| // reinitialized (e.g., after a handoff) to avoid creating duplicate subscriptions | ||
| // to the StreamingCredentialsProvider. | ||
| // | ||
| // The collection supports concurrent access from multiple goroutines during | ||
| // connection initialization, credential updates, and connection removal. | ||
| type CredentialsListeners struct { | ||
| // listeners maps connection ID to credentials listener | ||
| listeners map[uint64]auth.CredentialsListener | ||
|
|
||
| // lock protects concurrent access to the listeners map | ||
| lock sync.RWMutex | ||
| } | ||
|
|
||
| // NewCredentialsListeners creates a new thread-safe credentials listeners collection. | ||
| func NewCredentialsListeners() *CredentialsListeners { | ||
| return &CredentialsListeners{ | ||
| listeners: make(map[uint64]auth.CredentialsListener), | ||
| } | ||
| } | ||
|
|
||
| // Add adds or updates a credentials listener for a connection. | ||
| // | ||
| // If a listener already exists for the connection ID, it is replaced. | ||
| // This is safe because the old listener should have been unsubscribed | ||
| // before the connection was reinitialized. | ||
| // | ||
| // Thread-safe: Can be called concurrently from multiple goroutines. | ||
| func (c *CredentialsListeners) Add(connID uint64, listener auth.CredentialsListener) { | ||
| c.lock.Lock() | ||
| defer c.lock.Unlock() | ||
| if c.listeners == nil { | ||
| c.listeners = make(map[uint64]auth.CredentialsListener) | ||
| } | ||
| c.listeners[connID] = listener | ||
| } | ||
|
|
||
| // Get retrieves the credentials listener for a connection. | ||
| // | ||
| // Returns: | ||
| // - listener: The credentials listener for the connection, or nil if not found | ||
| // - ok: true if a listener exists for the connection ID, false otherwise | ||
| // | ||
| // Thread-safe: Can be called concurrently from multiple goroutines. | ||
| func (c *CredentialsListeners) Get(connID uint64) (auth.CredentialsListener, bool) { | ||
| c.lock.RLock() | ||
| defer c.lock.RUnlock() | ||
| if len(c.listeners) == 0 { | ||
| return nil, false | ||
| } | ||
| listener, ok := c.listeners[connID] | ||
| return listener, ok | ||
| } | ||
|
|
||
| // Remove removes the credentials listener for a connection. | ||
| // | ||
| // This is called when a connection is removed from the pool to prevent | ||
| // memory leaks. If no listener exists for the connection ID, this is a no-op. | ||
| // | ||
| // Thread-safe: Can be called concurrently from multiple goroutines. | ||
| func (c *CredentialsListeners) Remove(connID uint64) { | ||
| c.lock.Lock() | ||
| defer c.lock.Unlock() | ||
| delete(c.listeners, connID) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| package streaming | ||
|
|
||
| import ( | ||
| "errors" | ||
| "time" | ||
|
|
||
| "github.com/redis/go-redis/v9/auth" | ||
| "github.com/redis/go-redis/v9/internal/pool" | ||
| ) | ||
|
|
||
| // Manager coordinates streaming credentials and re-authentication for a connection pool. | ||
| // | ||
| // The manager is responsible for: | ||
| // - Creating and managing per-connection credentials listeners | ||
| // - Providing the pool hook for re-authentication | ||
| // - Coordinating between credentials updates and pool operations | ||
| // | ||
| // When credentials change via a StreamingCredentialsProvider: | ||
| // 1. The credentials listener (ConnReAuthCredentialsListener) receives the update | ||
| // 2. It calls MarkForReAuth on the manager | ||
| // 3. The manager delegates to the pool hook | ||
| // 4. The pool hook schedules background re-authentication | ||
| // | ||
| // The manager maintains a registry of credentials listeners indexed by connection ID, | ||
| // allowing listener reuse when connections are reinitialized (e.g., after handoff). | ||
| type Manager struct { | ||
| // credentialsListeners maps connection ID to credentials listener | ||
| credentialsListeners *CredentialsListeners | ||
|
|
||
| // pool is the connection pool being managed | ||
| pool pool.Pooler | ||
|
|
||
| // poolHookRef is the re-authentication pool hook | ||
| poolHookRef *ReAuthPoolHook | ||
| } | ||
|
|
||
| // NewManager creates a new streaming credentials manager. | ||
| // | ||
| // Parameters: | ||
| // - pl: The connection pool to manage | ||
| // - reAuthTimeout: Maximum time to wait for acquiring a connection for re-authentication | ||
| // | ||
| // The manager creates a ReAuthPoolHook sized to match the pool size, ensuring that | ||
| // re-auth operations don't exhaust the connection pool. | ||
| func NewManager(pl pool.Pooler, reAuthTimeout time.Duration) *Manager { | ||
| m := &Manager{ | ||
| pool: pl, | ||
| poolHookRef: NewReAuthPoolHook(pl.Size(), reAuthTimeout), | ||
| credentialsListeners: NewCredentialsListeners(), | ||
| } | ||
| m.poolHookRef.manager = m | ||
| return m | ||
| } | ||
|
|
||
| // PoolHook returns the pool hook for re-authentication. | ||
| // | ||
| // This hook should be registered with the connection pool to enable | ||
| // automatic re-authentication when credentials change. | ||
| func (m *Manager) PoolHook() pool.PoolHook { | ||
| return m.poolHookRef | ||
| } | ||
|
|
||
| // Listener returns or creates a credentials listener for a connection. | ||
| // | ||
| // This method is called during connection initialization to set up the | ||
| // credentials listener. If a listener already exists for the connection ID | ||
| // (e.g., after a handoff), it is reused. | ||
| // | ||
| // Parameters: | ||
| // - poolCn: The connection to create/get a listener for | ||
| // - reAuth: Function to re-authenticate the connection with new credentials | ||
| // - onErr: Function to call when re-authentication fails | ||
| // | ||
| // Returns: | ||
| // - auth.CredentialsListener: The listener to subscribe to the credentials provider | ||
| // - error: Non-nil if poolCn is nil | ||
| // | ||
| // Note: The reAuth and onErr callbacks are captured once when the listener is | ||
| // created and reused for the connection's lifetime. They should not change. | ||
| // | ||
| // Thread-safe: Can be called concurrently during connection initialization. | ||
| func (m *Manager) Listener( | ||
| poolCn *pool.Conn, | ||
| reAuth func(*pool.Conn, auth.Credentials) error, | ||
| onErr func(*pool.Conn, error), | ||
| ) (auth.CredentialsListener, error) { | ||
| if poolCn == nil { | ||
| return nil, errors.New("poolCn cannot be nil") | ||
| } | ||
| connID := poolCn.GetID() | ||
| // if we reconnect the underlying network connection, the streaming credentials listener will continue to work | ||
| // so we can get the old listener from the cache and use it. | ||
| // subscribing the same (an already subscribed) listener for a StreamingCredentialsProvider SHOULD be a no-op | ||
| listener, ok := m.credentialsListeners.Get(connID) | ||
| if !ok || listener == nil { | ||
| // Create new listener for this connection | ||
| // Note: Callbacks (reAuth, onErr) are captured once and reused for the connection's lifetime | ||
| newCredListener := &ConnReAuthCredentialsListener{ | ||
| conn: poolCn, | ||
| reAuth: reAuth, | ||
| onErr: onErr, | ||
| manager: m, | ||
| } | ||
|
|
||
| m.credentialsListeners.Add(connID, newCredListener) | ||
| listener = newCredListener | ||
| } | ||
| return listener, nil | ||
| } | ||
|
|
||
| // MarkForReAuth marks a connection for re-authentication. | ||
| // | ||
| // This method is called by the credentials listener when new credentials are | ||
| // received. It delegates to the pool hook to schedule background re-authentication. | ||
| // | ||
| // Parameters: | ||
| // - poolCn: The connection to re-authenticate | ||
| // - reAuthFn: Function to call for re-authentication, receives error if acquisition fails | ||
| // | ||
| // Thread-safe: Called by credentials listeners when credentials change. | ||
| func (m *Manager) MarkForReAuth(poolCn *pool.Conn, reAuthFn func(error)) { | ||
| connID := poolCn.GetID() | ||
| m.poolHookRef.MarkForReAuth(connID, reAuthFn) | ||
| } | ||
|
|
||
| // RemoveListener removes the credentials listener for a connection. | ||
| // | ||
| // This method is called by the pool hook's OnRemove to clean up listeners | ||
| // when connections are removed from the pool. | ||
| // | ||
| // Parameters: | ||
| // - connID: The connection ID whose listener should be removed | ||
| // | ||
| // Thread-safe: Called during connection removal. | ||
| func (m *Manager) RemoveListener(connID uint64) { | ||
| m.credentialsListeners.Remove(connID) | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.