Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs with aggregated discovery #113764

Merged
merged 6 commits into from Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/aggregator.go
Expand Up @@ -137,7 +137,7 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
// Imbue all builtin group-priorities onto the aggregated discovery
if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
for gv, entry := range apiVersionPriorities {
aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupPriority(gv.Group, int(entry.group))
aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))
}
}

Expand Down
Expand Up @@ -269,7 +269,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
Resources: aggregatedApiResourcesForDiscovery,
})
// Default priority for CRDs
c.resourceManager.SetGroupPriority(version.Group, 1000)
c.resourceManager.SetGroupVersionPriority(metav1.GroupVersion(version), 1000, 100)
}
return nil
}
Expand Down
Expand Up @@ -273,13 +273,17 @@ func TestResourceManagerExistingCRD(t *testing.T) {

env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().
SetGroupPriority(coolFooCRD.Spec.Group, 1000)
for _, v := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().
SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: v.Name}, 1000, 100)
}

env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().
SetGroupPriority(coolFooCRD.Spec.Group, 1000)
for _, v := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().
SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: v.Name}, 1000, 100)
}

env.Start(ctx)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
Expand All @@ -295,7 +299,10 @@ func TestResourceManagerAddedCRD(t *testing.T) {
env := setup()
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
for _, v := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().
SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: v.Name}, 1000, 100)
}

env.Start(ctx)

Expand Down Expand Up @@ -340,7 +347,9 @@ func TestMultipleCRDSameVersion(t *testing.T) {
require.NoError(t, err)
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
for _, versionEntry := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: versionEntry.Name}, 1000, 100)
}
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)

Expand All @@ -358,7 +367,9 @@ func TestMultipleCRDSameVersion(t *testing.T) {

env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, mergedDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
for _, versionEntry := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: versionEntry.Name}, 1000, 100)
}
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
}
Expand Down Expand Up @@ -388,7 +399,9 @@ func TestDiscoveryControllerResourceManagerRemovedCRD(t *testing.T) {
// Resource Manager
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
for _, versionEntry := range coolFooCRD.Spec.Versions {
env.FakeResourceManager.Expect().SetGroupVersionPriority(metav1.GroupVersion{Group: coolFooCRD.Spec.Group, Version: versionEntry.Name}, 1000, 100)
}
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)

Expand Down
Expand Up @@ -110,14 +110,15 @@ func (f *fakeResourceManager) WaitForActions(ctx context.Context, timeout time.D
return err
}

func (f *recorderResourceManager) SetGroupPriority(groupName string, priority int) {
func (f *recorderResourceManager) SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int) {
f.lock.Lock()
defer f.lock.Unlock()

f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "SetGroupPriority",
Group: groupName,
Value: priority,
Type: "SetGroupVersionPriority",
Group: gv.Group,
Version: gv.Version,
Value: versionpriority,
})
}

Expand Down
Expand Up @@ -24,6 +24,7 @@ import (

apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"

"sync/atomic"
Expand All @@ -42,10 +43,12 @@ type ResourceManager interface {
// Thread-safe
AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery)

// Sets priority for a group for sorting discovery.
// If a priority is set before the group is known, the priority will be ignored
// Once a group is removed, the priority is forgotten.
SetGroupPriority(groupName string, priority int)
// Sets a priority to be used while sorting a specific group and
// group-version. If two versions report different priorities for
// the group, the higher one will be used. If the group is not
// known, the priority is ignored. The priority for this version
// is forgotten once the group-version is forgotten
SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int)

// Removes all group versions for a given group
// Thread-safe
Expand All @@ -71,28 +74,32 @@ type resourceDiscoveryManager struct {

// Writes protected by the lock.
// List of all apigroups & resources indexed by the resource manager
lock sync.RWMutex
apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery
apiGroupNames map[string]int
lock sync.RWMutex
apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery
versionPriorities map[metav1.GroupVersion]priorityInfo
}

type priorityInfo struct {
GroupPriorityMinimum int
VersionPriority int
}

func NewResourceManager() ResourceManager {
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
return &resourceDiscoveryManager{serializer: codecs, apiGroupNames: make(map[string]int)}
return &resourceDiscoveryManager{serializer: codecs, versionPriorities: make(map[metav1.GroupVersion]priorityInfo)}
}

func (rdm *resourceDiscoveryManager) SetGroupPriority(group string, priority int) {
func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) {
rdm.lock.Lock()
defer rdm.lock.Unlock()

if _, exists := rdm.apiGroupNames[group]; exists {
rdm.apiGroupNames[group] = priority
rdm.cache.Store(nil)
} else {
klog.Warningf("DiscoveryManager: Attempted to set priority for group %s but does not exist", group)
rdm.versionPriorities[gv] = priorityInfo{
GroupPriorityMinimum: groupPriorityMinimum,
VersionPriority: versionPriority,
}
rdm.cache.Store(nil)
}

func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) {
Expand All @@ -108,10 +115,25 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG
}
}

// Filter unused out apiGroupNames
for name := range rdm.apiGroupNames {
if _, exists := rdm.apiGroups[name]; !exists {
delete(rdm.apiGroupNames, name)
// Filter unused out priority entries
for gv := range rdm.versionPriorities {
entry, exists := rdm.apiGroups[gv.Group]
if !exists {
delete(rdm.versionPriorities, gv)
continue
}

containsVersion := false

for _, v := range entry.Versions {
if v.Version == gv.Version {
containsVersion = true
break
}
}

if !containsVersion {
delete(rdm.versionPriorities, gv)
}
}
}
Expand Down Expand Up @@ -161,7 +183,14 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value},
}
rdm.apiGroups[groupName] = group
rdm.apiGroupNames[groupName] = 0
}

gv := metav1.GroupVersion{Group: groupName, Version: value.Version}
if _, ok := rdm.versionPriorities[gv]; !ok {
rdm.versionPriorities[gv] = priorityInfo{
GroupPriorityMinimum: 1000,
VersionPriority: 15,
}
}

// Reset response document so it is recreated lazily
Expand Down Expand Up @@ -189,9 +218,9 @@ func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVer
return
}

delete(rdm.versionPriorities, apiGroup)
if len(group.Versions) == 0 {
delete(rdm.apiGroups, group.Name)
delete(rdm.apiGroupNames, group.Name)
}

// Reset response document so it is recreated lazily
Expand All @@ -203,7 +232,12 @@ func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) {
defer rdm.lock.Unlock()

delete(rdm.apiGroups, groupName)
delete(rdm.apiGroupNames, groupName)

for k := range rdm.versionPriorities {
if k.Group == groupName {
delete(rdm.versionPriorities, k)
}
}

// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
Expand All @@ -215,17 +249,49 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2
// Re-order the apiGroups by their priority.
groups := []apidiscoveryv2beta1.APIGroupDiscovery{}
for _, group := range rdm.apiGroups {
groups = append(groups, *group.DeepCopy())
copied := *group.DeepCopy()

// Re-order versions based on their priority. Use kube-aware string
// comparison as a tie breaker
sort.SliceStable(copied.Versions, func(i, j int) bool {
iVersion := copied.Versions[i].Version
jVersion := copied.Versions[j].Version

iPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: iVersion}].VersionPriority
jPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: jVersion}].VersionPriority

// Sort by version string comparator if priority is equal
if iPriority == jPriority {
return version.CompareKubeAwareVersionStrings(iVersion, jVersion) > 0
}

// i sorts before j if it has a higher priority
return iPriority > jPriority
})

groups = append(groups, *copied.DeepCopy())

}

// For each group, determine the highest minimum group priority and use that
priorities := map[string]int{}
for gv, info := range rdm.versionPriorities {
if existing, exists := priorities[gv.Group]; exists {
if existing < info.GroupPriorityMinimum {
priorities[gv.Group] = info.GroupPriorityMinimum
}
} else {
priorities[gv.Group] = info.GroupPriorityMinimum
}
}

sort.SliceStable(groups, func(i, j int) bool {
iName := groups[i].Name
jName := groups[j].Name

// Default to 0 priority by default
iPriority := rdm.apiGroupNames[iName]
jPriority := rdm.apiGroupNames[jName]
iPriority := priorities[iName]
jPriority := priorities[jName]

// Sort discovery based on apiservice priority.
// Duplicated from staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helpers.go
Expand All @@ -234,7 +300,7 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2
return iName < jName
}

// i sorts before j if it has a lower priority
// i sorts before j if it has a higher priority
return iPriority > jPriority
})

Expand Down