Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deflake tests in staging/src/k8s.io/kube-aggregator/pkg/apiserver #115859

Merged
merged 2 commits into from Mar 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,8 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -37,19 +39,22 @@ import (
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)

func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
return NewDiscoveryManager(rm).(*discoveryManager)
dm := NewDiscoveryManager(rm).(*discoveryManager)
dm.dirtyAPIServiceQueue = newCompleterWorkqueue(dm.dirtyAPIServiceQueue)

return dm
}

// Returns true if the queue of services to sync empty this means everything has
// been reconciled and placed into merged document
func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool {
// Returns true if the queue of services to sync is complete which means
// everything has been reconciled and placed into merged document
func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
return cache.WaitForCacheSync(stopCh, func() bool {
// Once items have successfully synced they are removed from queue.
return dm.dirtyAPIServiceQueue.Len() == 0
return dm.dirtyAPIServiceQueue.(*completerWorkqueue).isComplete()
})
}

Expand Down Expand Up @@ -103,7 +108,7 @@ func TestBasic(t *testing.T) {

go aggregatedManager.Run(testCtx.Done())

require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 {
Expand Down Expand Up @@ -134,7 +139,43 @@ func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList,
// Test that a handler associated with an APIService gets pinged after the
// APIService has been marked as dirty
func TestDirty(t *testing.T) {
pinged := false
var pinged atomic.Bool
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()

aggregatedManager := newDiscoveryManager(aggregatedResourceManager)

aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: "stable.example.com",
Version: "v1",
Service: &apiregistrationv1.ServiceReference{
Name: "test-service",
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pinged.Store(true)
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()

go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

// immediately check for ping, since Run() should block for local services
if !pinged.Load() {
t.Errorf("service handler never pinged")
}
}

// Shows that waitForQueueComplete also waits for syncing to
// complete by artificially making the sync handler take a long time
func TestWaitForSync(t *testing.T) {
pinged := atomic.Bool{}
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()

Expand All @@ -152,17 +193,18 @@ func TestDirty(t *testing.T) {
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pinged = true
time.Sleep(3 * time.Second)
pinged.Store(true)
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()

go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

// immediately check for ping, since Run() should block for local services
if !pinged {
if !pinged.Load() {
t.Errorf("service handler never pinged")
}
}
Expand Down Expand Up @@ -211,7 +253,7 @@ func TestRemoveAPIService(t *testing.T) {
aggregatedManager.RemoveAPIService(s.Name)
}

require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

response, _, parsed := fetchPath(aggyService, "")
if response.StatusCode != 200 {
Expand Down Expand Up @@ -355,7 +397,7 @@ func TestLegacyFallbackNoCache(t *testing.T) {
defer cancel()

go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

// At this point external services have synced. Check if discovery document
// includes the legacy resources
Expand Down Expand Up @@ -464,7 +506,7 @@ func TestLegacyFallback(t *testing.T) {
defer cancel()

go aggregatedManager.Run(testCtx.Done())
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

// At this point external services have synced. Check if discovery document
// includes the legacy resources
Expand Down Expand Up @@ -533,18 +575,18 @@ func TestNotModified(t *testing.T) {

// Important to wait here to ensure we prime the cache with the initial list
// of documents in order to exercise 304 Not Modified
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))

// Now add all groups. We excluded one group before so that AllServicesSynced
// could include it in this round. Now, if AllServicesSynced ever returns
// Now add all groups. We excluded one group before so that waitForQueueIsComplete
// could include it in this round. Now, if waitForQueueIsComplete ever returns
// true, it must have synced all the pre-existing groups before, which would
// return 304 Not Modified
for _, s := range apiServices {
aggregatedManager.AddAPIService(s, service)
}

// This would wait the full timeout on 1.26.0.
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
}

gjkim42 marked this conversation as resolved.
Show resolved Hide resolved
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
Expand Down Expand Up @@ -609,3 +651,48 @@ func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apid

return w.Result(), bytes, decoded
}

// completerWorkqueue is a workqueue.RateLimitingInterface that implements
// isComplete
type completerWorkqueue struct {
lock sync.Mutex
workqueue.RateLimitingInterface
processing map[interface{}]struct{}
}

var _ = workqueue.RateLimitingInterface(&completerWorkqueue{})

func newCompleterWorkqueue(wq workqueue.RateLimitingInterface) *completerWorkqueue {
return &completerWorkqueue{
RateLimitingInterface: wq,
processing: make(map[interface{}]struct{}),
}
}

func (q *completerWorkqueue) Add(item interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
q.processing[item] = struct{}{}
q.RateLimitingInterface.Add(item)
}

func (q *completerWorkqueue) AddAfter(item interface{}, duration time.Duration) {
q.Add(item)
}

func (q *completerWorkqueue) AddRateLimited(item interface{}) {
q.Add(item)
}
Comment on lines +679 to +685
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird to me, but I know neither of these ever are called by discovery manager, since syncAPIService unconditionally returns nil. (Discovery Manager refreshes all services on minute interval unconditionally, so does not make use of rate limiter)

I'm wondering if this or a future PR should refactor it to not use the workqueue.RateLimitingInterface and instead use normal workqueue.Interface. I think we can keep that in mind for a separate PR, to retain this PR's nice property of only changing test files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring it with a separate PR would be great.


func (q *completerWorkqueue) Done(item interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
delete(q.processing, item)
q.RateLimitingInterface.Done(item)
}

func (q *completerWorkqueue) isComplete() bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.Len() == 0 && len(q.processing) == 0
}