Skip to content

Commit

Permalink
UPSTREAM: <carry>: patch aggregator to allow delegating resources
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: prevent apiservice registration by CRD controller when delegating

UPSTREAM: <carry>: prevent CRD registration from fighting with APIServices

UPSTREAM: <carry>: always delegate namespaced resources

OpenShift-Rebase-Source: d4cd0ba
  • Loading branch information
mfojtik authored and soltysh committed Dec 8, 2023
1 parent aca5da8 commit 16fc8ea
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apiserver"
)

// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
Expand Down Expand Up @@ -193,6 +194,10 @@ func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResour
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
apiServiceName := groupVersion.Version + "." + groupVersion.Group

if apiserver.APIServiceAlreadyExists(groupVersion) {
return nil
}

// check all CRDs. There shouldn't that many, but if we have problems later we can index them
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
Expand Down
40 changes: 28 additions & 12 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Expand Up @@ -152,6 +152,9 @@ type APIAggregator struct {
// is the versions for the group.
handledGroupVersions map[string]sets.Set[string]

// handledAlwaysLocalDelegatePaths are the URL paths that already have routes registered
handledAlwaysLocalDelegatePaths sets.String

// lister is used to add group handling for /apis/<group> aggregator lookups based on
// controller state
lister listers.APIServiceLister
Expand Down Expand Up @@ -236,18 +239,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
}

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransportDial: proxyTransportDial,
proxyHandlers: map[string]*proxyHandler{},
handledGroupVersions: map[string]sets.Set[string]{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransportDial: proxyTransportDial,
proxyHandlers: map[string]*proxyHandler{},
handledGroupVersions: map[string]sets.Set[string]{},
handledAlwaysLocalDelegatePaths: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}

// used later to filter the served resource by those that have expired.
Expand Down Expand Up @@ -569,6 +573,18 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
return nil
}

// For some resources we always want to delegate to local API server.
// These resources have to exists as CRD to be served locally.
for _, alwaysLocalDelegatePath := range alwaysLocalDelegatePathPrefixes.List() {
if s.handledAlwaysLocalDelegatePaths.Has(alwaysLocalDelegatePath) {
continue
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(alwaysLocalDelegatePath, proxyHandler.localDelegate)
// Always use local delegate for this prefix
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(alwaysLocalDelegatePath+"/", proxyHandler.localDelegate)
s.handledAlwaysLocalDelegatePaths.Insert(alwaysLocalDelegatePath)
}

// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
Expand Down
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
Expand Down Expand Up @@ -113,6 +114,14 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

// some groupResources should always be delegated
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
if alwaysLocalDelegateGroupResource[schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}] {
r.localDelegate.ServeHTTP(w, req)
return
}
}

if !handlingInfo.serviceAvailable {
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
Expand Down
@@ -0,0 +1,49 @@
package apiserver

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
)

// alwaysLocalDelegatePrefixes specify a list of API paths that we want to delegate to Kubernetes API server
// instead of handling with OpenShift API server.
var alwaysLocalDelegatePathPrefixes = sets.NewString()

// AddAlwaysLocalDelegateForPrefix will cause the given URL prefix always be served by local API server (kube apiserver).
// This allows to move some resources from aggregated API server into CRD.
func AddAlwaysLocalDelegateForPrefix(prefix string) {
if alwaysLocalDelegatePathPrefixes.Has(prefix) {
return
}
alwaysLocalDelegatePathPrefixes.Insert(prefix)
}

var overlappingGroupVersion = map[schema.GroupVersion]bool{}

// AddOverlappingGroupVersion will stop the CRD registration controller from trying to manage an APIService.
func AddOverlappingGroupVersion(groupVersion schema.GroupVersion) {
overlappingGroupVersion[groupVersion] = true
}

var alwaysLocalDelegateGroupResource = map[schema.GroupResource]bool{}

func AddAlwaysLocalDelegateGroupResource(groupResource schema.GroupResource) {
alwaysLocalDelegateGroupResource[groupResource] = true
}

func APIServiceAlreadyExists(groupVersion schema.GroupVersion) bool {
if overlappingGroupVersion[groupVersion] {
return true
}

testPrefix := fmt.Sprintf("/apis/%s/%s/", groupVersion.Group, groupVersion.Version)
for _, prefix := range alwaysLocalDelegatePathPrefixes.List() {
if strings.HasPrefix(prefix, testPrefix) {
return true
}
}
return false
}

0 comments on commit 16fc8ea

Please sign in to comment.