diff --git a/credentials_provider.go b/credentials_provider.go index 34839b5..3e27527 100644 --- a/credentials_provider.go +++ b/credentials_provider.go @@ -35,9 +35,12 @@ type entraidCredentialsProvider struct { // It notifies all registered listeners with the new token. func (e *entraidCredentialsProvider) onTokenNext(t *token.Token) { e.rwLock.RLock() - defer e.rwLock.RUnlock() + // Make a deep copy of the listeners slice to avoid data race + listeners := make([]auth.CredentialsListener, len(e.listeners)) + copy(listeners, e.listeners) + e.rwLock.RUnlock() // Notify all listeners with the new token. - for _, listener := range e.listeners { + for _, listener := range listeners { listener.OnNext(t) } } @@ -46,10 +49,13 @@ func (e *entraidCredentialsProvider) onTokenNext(t *token.Token) { // It notifies all registered listeners with the error. func (e *entraidCredentialsProvider) onTokenError(err error) { e.rwLock.RLock() - defer e.rwLock.RUnlock() + // Make a deep copy of the listeners slice to avoid data race + listeners := make([]auth.CredentialsListener, len(e.listeners)) + copy(listeners, e.listeners) + e.rwLock.RUnlock() // Notify all listeners with the error - for _, listener := range e.listeners { + for _, listener := range listeners { listener.OnError(err) } } diff --git a/credentials_provider_test.go b/credentials_provider_test.go index e717a93..4a10a9d 100644 --- a/credentials_provider_test.go +++ b/credentials_provider_test.go @@ -1,7 +1,30 @@ package entraid +// This file contains comprehensive tests for the StreamingCredentialsProvider deadlock bug. +// +// Bug Summary: +// A deadlock occurs when listener callbacks (OnNext/OnError) are invoked while holding RLock, +// and the listener callback triggers an unsubscribe operation that tries to acquire Lock. +// Since RWMutex doesn't allow upgrading read lock to write lock, this causes a deadlock. +// +// Real-world scenario: +// 1. Provider calls onTokenNext/onTokenError while holding RLock +// 2. ReAuthCredentialsListener.OnNext/OnError triggers re-authentication +// 3. Re-auth fails and closes the Redis connection +// 4. Connection close triggers provider's unsubscribe function +// 5. Unsubscribe tries to acquire Lock while RLock is still held +// 6. Deadlock occurs, blocking token refresh indefinitely +// +// To reproduce the bug in real scenarios: +// 1. Set up Redis with authentication +// 2. Use StreamingCredentialsProvider with token refresh +// 3. Simulate authentication failures that trigger connection close +// 4. Observe that token refresh hangs indefinitely + import ( + "errors" "sync" + "sync/atomic" "testing" "time" @@ -575,3 +598,498 @@ func TestCredentialsProviderSubscribe(t *testing.T) { } }) } + +// TestCredentialsProviderDeadlockScenario tests the deadlock scenario described in the bug report. +// +// Bug Description: +// A deadlock occurs in StreamingCredentialsProvider when listener callbacks (OnNext/OnError) +// are invoked while holding RLock. If re-auth fails, go-redis may close the connection and +// trigger the provider's unsubscribe, which then tries to acquire Lock on the same RWMutex. +// Since RWMutex doesn't allow upgrading a read lock to a write lock, this leads to a deadlock. +// +// Reproduction Steps: +// 1. Provider receives a new token and calls onTokenNext +// 2. onTokenNext acquires RLock and invokes listener.OnNext(t) +// 3. ReAuthCredentialsListener.OnNext calls re-auth; on error it triggers onAuthenticationErr +// 4. onAuthenticationErr closes the connection (e.g. bad conn) +// 5. Conn.Close() triggers the provider's unsubscribe +// 6. unsubscribe tries to acquire Lock, while RLock is still held +// 7. Deadlock occurs +// +// Expected Behavior: +// - These tests should FAIL when the deadlock bug is present (current state) +// - These tests should PASS when the deadlock bug is fixed +// +// This test reproduces the deadlock by creating a listener that calls unsubscribe +// during the OnNext callback, simulating the real-world scenario. +func TestCredentialsProviderDeadlockScenario(t *testing.T) { + t.Run("deadlock on unsubscribe during OnNext", func(t *testing.T) { + // Create a test token + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + // Create credentials provider with mock token manager + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + // Create a deadlock-inducing listener that calls unsubscribe during OnNext + deadlockListener := &deadlockInducingListener{ + provider: cp.(*entraidCredentialsProvider), + unsubscribe: nil, // Will be set after subscription + } + + // Subscribe the deadlock listener + credentials, cancel, err := cp.Subscribe(deadlockListener) + require.NoError(t, err) + require.NotNil(t, credentials) + require.NotNil(t, cancel) + + // Set the unsubscribe function in the listener + deadlockListener.unsubscribe = cancel + + // Use a timeout to detect deadlock + done := make(chan bool, 1) + timeout := time.After(5 * time.Second) + + go func() { + // Trigger token update which should cause deadlock + cp.(*entraidCredentialsProvider).onTokenNext(testToken) + done <- true + }() + + select { + case <-done: + // Test passes - no deadlock occurred (this means the bug is fixed) + t.Log("No deadlock detected - operation completed successfully") + case <-timeout: + // Test fails - deadlock occurred (this means the bug is present) + t.Fatal("Deadlock detected: operation timed out due to RWMutex deadlock in onTokenNext") + } + }) + + t.Run("concurrent token update and unsubscribe stress test", func(t *testing.T) { + // This test verifies that concurrent token updates and unsubscribes + // can cause deadlocks under stress conditions + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + provider := cp.(*entraidCredentialsProvider) + + // Create multiple listeners that will trigger unsubscribe during OnNext + numListeners := 10 + listeners := make([]*deadlockInducingListener, numListeners) + cancels := make([]auth.UnsubscribeFunc, numListeners) + + // Subscribe all listeners + for i := 0; i < numListeners; i++ { + listener := &deadlockInducingListener{ + provider: provider, + unsubscribe: nil, + } + listeners[i] = listener + + _, cancel, err := cp.Subscribe(listener) + require.NoError(t, err) + cancels[i] = cancel + listener.unsubscribe = cancel + } + + // Use a timeout to detect deadlock + done := make(chan bool, 1) + timeout := time.After(10 * time.Second) + + go func() { + // Trigger multiple concurrent token updates + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + provider.onTokenNext(testToken) + }() + } + wg.Wait() + done <- true + }() + + select { + case <-done: + // Test passes - no deadlock occurred (this means the bug is fixed) + t.Log("No deadlock detected in stress test - operation completed successfully") + case <-timeout: + // Test fails - deadlock occurred (this means the bug is present) + t.Fatal("Deadlock detected in stress test: operation timed out due to RWMutex deadlock") + } + }) +} + +// deadlockInducingListener is a mock listener that simulates the deadlock scenario +// by calling unsubscribe during OnNext, which mimics what happens when +// ReAuthCredentialsListener fails re-auth and closes the connection +type deadlockInducingListener struct { + provider *entraidCredentialsProvider + unsubscribe auth.UnsubscribeFunc +} + +func (d *deadlockInducingListener) OnNext(credentials auth.Credentials) { + // Simulate the scenario where re-auth fails and connection is closed + // This triggers unsubscribe while we're still in the OnNext callback + // which is called while holding RLock + if d.unsubscribe != nil { + // This call will try to acquire Lock while RLock is held, causing deadlock + // We call it directly (not in a goroutine) to reproduce the actual deadlock + _ = d.unsubscribe() + } +} + +func (d *deadlockInducingListener) OnError(err error) { + // Simulate the scenario where error handling also triggers unsubscribe + // This can also cause deadlock if called while holding RLock + if d.unsubscribe != nil { + _ = d.unsubscribe() + } +} + +// nonBlockingListener is a test listener that doesn't block on channels +type nonBlockingListener struct { + tokenCount int32 +} + +func (n *nonBlockingListener) OnNext(credentials auth.Credentials) { + atomic.AddInt32(&n.tokenCount, 1) +} + +func (n *nonBlockingListener) OnError(err error) { + // No-op for this test +} + +// TestCredentialsProviderDeadlockOnError tests deadlock scenario during error handling +func TestCredentialsProviderDeadlockOnError(t *testing.T) { + t.Run("deadlock on unsubscribe during OnError", func(t *testing.T) { + // Create a test token + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + // Create credentials provider with mock token manager + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + // Create a deadlock-inducing listener that calls unsubscribe during OnError + deadlockListener := &deadlockInducingListener{ + provider: cp.(*entraidCredentialsProvider), + unsubscribe: nil, // Will be set after subscription + } + + // Subscribe the deadlock listener + credentials, cancel, err := cp.Subscribe(deadlockListener) + require.NoError(t, err) + require.NotNil(t, credentials) + require.NotNil(t, cancel) + + // Set the unsubscribe function in the listener + deadlockListener.unsubscribe = cancel + + // Use a timeout to detect deadlock + done := make(chan bool, 1) + timeout := time.After(5 * time.Second) + + go func() { + // Trigger error which should cause deadlock + testError := errors.New("test authentication error") + cp.(*entraidCredentialsProvider).onTokenError(testError) + done <- true + }() + + select { + case <-done: + // Test passes - no deadlock occurred (this means the bug is fixed) + t.Log("No deadlock detected during error handling - operation completed successfully") + case <-timeout: + // Test fails - deadlock occurred (this means the bug is present) + t.Fatal("Deadlock detected during error handling: operation timed out due to RWMutex deadlock in onTokenError") + } + }) +} + +// TestCredentialsProviderRaceCondition tests for race conditions in concurrent scenarios +func TestCredentialsProviderRaceCondition(t *testing.T) { + t.Run("race condition between subscribe and token update", func(t *testing.T) { + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + provider := cp.(*entraidCredentialsProvider) + + // Run with race detector enabled + var wg sync.WaitGroup + numGoroutines := 5 // Reduced to avoid channel blocking + + listeners := make([]*nonBlockingListener, numGoroutines) + cancels := make([]auth.UnsubscribeFunc, numGoroutines) + + // Subscribe listeners first + for i := 0; i < numGoroutines; i++ { + listener := &nonBlockingListener{} + listeners[i] = listener + _, cancel, err := cp.Subscribe(listener) + require.NoError(t, err) + cancels[i] = cancel + } + + // Concurrent token updates + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + provider.onTokenNext(testToken) + }() + } + + // Concurrent unsubscribes + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + time.Sleep(time.Millisecond) // Small delay to allow some token updates + _ = cancels[idx]() + }(i) + } + + // Wait for all goroutines to complete + wg.Wait() + }) +} + +// TestCredentialsProviderDeadlockFix demonstrates the expected behavior after the deadlock is fixed. +// This test shows how the provider should handle unsubscribe calls during listener callbacks +// without causing deadlocks. +func TestCredentialsProviderDeadlockFix(t *testing.T) { + // Note: The deadlock bug has been fixed! The fix involves: + // 1. Copying the listeners slice while holding RLock + // 2. Releasing RLock before calling listener callbacks + // 3. This allows unsubscribe operations to acquire Lock without deadlock + + t.Run("no deadlock after fix - unsubscribe during OnNext", func(t *testing.T) { + // This test would pass after implementing the fix + // The fix should involve: + // 1. Not holding RLock while calling listener callbacks, OR + // 2. Using a different synchronization mechanism that allows safe unsubscribe during callbacks, OR + // 3. Deferring unsubscribe operations to avoid the deadlock + + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + // Create a listener that calls unsubscribe during OnNext + deadlockListener := &deadlockInducingListener{ + provider: cp.(*entraidCredentialsProvider), + unsubscribe: nil, + } + + credentials, cancel, err := cp.Subscribe(deadlockListener) + require.NoError(t, err) + require.NotNil(t, credentials) + require.NotNil(t, cancel) + + deadlockListener.unsubscribe = cancel + + // After the fix, this should complete without deadlock + done := make(chan bool, 1) + timeout := time.After(2 * time.Second) + + go func() { + cp.(*entraidCredentialsProvider).onTokenNext(testToken) + done <- true + }() + + select { + case <-done: + // Test passes - no deadlock occurred + t.Log("Success: No deadlock detected after fix") + case <-timeout: + t.Fatal("Deadlock still present - fix not working correctly") + } + }) +} + +// TestCredentialsProviderEdgeCases tests additional edge cases related to the deadlock bug +func TestCredentialsProviderEdgeCases(t *testing.T) { + t.Run("multiple listeners with mixed unsubscribe behavior", func(t *testing.T) { + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + // Create a mix of normal listeners and deadlock-inducing listeners + normalListener := &mockCredentialsListener{ + LastTokenCh: make(chan string, 1), + LastErrCh: make(chan error, 1), + } + + deadlockListener := &deadlockInducingListener{ + provider: cp.(*entraidCredentialsProvider), + unsubscribe: nil, + } + + // Subscribe both listeners + _, cancel1, err := cp.Subscribe(normalListener) + require.NoError(t, err) + defer cancel1() + + _, cancel2, err := cp.Subscribe(deadlockListener) + require.NoError(t, err) + deadlockListener.unsubscribe = cancel2 + + // This should cause deadlock due to the deadlock-inducing listener + done := make(chan bool, 1) + timeout := time.After(3 * time.Second) + + go func() { + cp.(*entraidCredentialsProvider).onTokenNext(testToken) + done <- true + }() + + select { + case <-done: + t.Log("No deadlock detected - this indicates the bug might be fixed") + case <-timeout: + t.Fatal("Deadlock detected with mixed listener types") + } + }) + + t.Run("rapid subscribe and unsubscribe operations", func(t *testing.T) { + testToken := token.New( + "test", + "test", + rawTokenString, + time.Now().Add(time.Hour), + time.Now(), + time.Hour.Milliseconds(), + ) + + tm := &fakeTokenManager{ + token: testToken, + } + + cp, err := NewCredentialsProvider(tm, CredentialsProviderOptions{}) + require.NoError(t, err) + require.NotNil(t, cp) + + // Rapidly subscribe and unsubscribe while triggering token updates + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10; j++ { + listener := &mockCredentialsListener{ + LastTokenCh: make(chan string, 1), + LastErrCh: make(chan error, 1), + } + _, cancel, err := cp.Subscribe(listener) + if err == nil && cancel != nil { + _ = cancel() + } + } + }() + } + + // Concurrent token updates + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + cp.(*entraidCredentialsProvider).onTokenNext(testToken) + time.Sleep(time.Millisecond) + } + }() + + // Wait for completion with timeout + done := make(chan bool, 1) + go func() { + wg.Wait() + done <- true + }() + + select { + case <-done: + t.Log("Rapid subscribe/unsubscribe test completed successfully") + case <-time.After(10 * time.Second): + t.Fatal("Rapid subscribe/unsubscribe test timed out - possible deadlock") + } + }) +}