Skip to content

Commit

Permalink
UPSTREAM: <carry>: patch aggregator to allow delegating resources
Browse files Browse the repository at this point in the history
Origin-commit: 14ba1f8ece9a7bb00ececb2a35b5f8f5fbeacc83

UPSTREAM: <carry>: prevent apiservice registration by CRD controller when delegating

Origin-commit: 3d216eab7adcbd8596606d72d31b6af621bfd350

UPSTREAM: <carry>: prevent CRD registration from fighting with APIServices

Origin-commit: c1c87eeade4730a2271cb98b4c6ea16af07e3e68

UPSTREAM: <carry>: always delegate namespaced resources

Origin-commit: 7f0815b5a88d57046a92fbdbc493bab2ad28a79c

openshift-rebase(v1.24):source=f9a6b73ca78

openshift-rebase(v1.24):source=f9a6b73ca78

openshift-rebase(v1.24):source=f9a6b73ca78
  • Loading branch information
mfojtik authored and soltysh committed Aug 18, 2022
1 parent 9da2e20 commit b8ff23a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apiserver"
)

// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
Expand Down Expand Up @@ -193,6 +194,10 @@ func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResour
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
apiServiceName := groupVersion.Version + "." + groupVersion.Group

if apiserver.APIServiceAlreadyExists(groupVersion) {
return nil
}

// check all CRDs. There shouldn't that many, but if we have problems later we can index them
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
Expand Down
40 changes: 28 additions & 12 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ type APIAggregator struct {
// handledGroups are the groups that already have routes
handledGroups sets.String

// handledAlwaysLocalDelegatePaths are the URL paths that already have routes registered
handledAlwaysLocalDelegatePaths sets.String

// lister is used to add group handling for /apis/<group> aggregator lookups based on
// controller state
lister listers.APIServiceLister
Expand Down Expand Up @@ -204,18 +207,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
}

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
handledAlwaysLocalDelegatePaths: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
}

// used later to filter the served resource by those that have expired.
Expand Down Expand Up @@ -502,6 +506,18 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
return nil
}

// For some resources we always want to delegate to local API server.
// These resources have to exists as CRD to be served locally.
for _, alwaysLocalDelegatePath := range alwaysLocalDelegatePathPrefixes.List() {
if s.handledAlwaysLocalDelegatePaths.Has(alwaysLocalDelegatePath) {
continue
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(alwaysLocalDelegatePath, proxyHandler.localDelegate)
// Always use local delegate for this prefix
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(alwaysLocalDelegatePath+"/", proxyHandler.localDelegate)
s.handledAlwaysLocalDelegatePaths.Insert(alwaysLocalDelegatePath)
}

// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
Expand Down Expand Up @@ -122,6 +123,14 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

// some groupResources should always be delegated
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
if alwaysLocalDelegateGroupResource[schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}] {
r.localDelegate.ServeHTTP(w, req)
return
}
}

if !handlingInfo.serviceAvailable {
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package apiserver

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
)

// alwaysLocalDelegatePrefixes specify a list of API paths that we want to delegate to Kubernetes API server
// instead of handling with OpenShift API server.
var alwaysLocalDelegatePathPrefixes = sets.NewString()

// AddAlwaysLocalDelegateForPrefix will cause the given URL prefix always be served by local API server (kube apiserver).
// This allows to move some resources from aggregated API server into CRD.
func AddAlwaysLocalDelegateForPrefix(prefix string) {
if alwaysLocalDelegatePathPrefixes.Has(prefix) {
return
}
alwaysLocalDelegatePathPrefixes.Insert(prefix)
}

var overlappingGroupVersion = map[schema.GroupVersion]bool{}

// AddOverlappingGroupVersion will stop the CRD registration controller from trying to manage an APIService.
func AddOverlappingGroupVersion(groupVersion schema.GroupVersion) {
overlappingGroupVersion[groupVersion] = true
}

var alwaysLocalDelegateGroupResource = map[schema.GroupResource]bool{}

func AddAlwaysLocalDelegateGroupResource(groupResource schema.GroupResource) {
alwaysLocalDelegateGroupResource[groupResource] = true
}

func APIServiceAlreadyExists(groupVersion schema.GroupVersion) bool {
if overlappingGroupVersion[groupVersion] {
return true
}

testPrefix := fmt.Sprintf("/apis/%s/%s/", groupVersion.Group, groupVersion.Version)
for _, prefix := range alwaysLocalDelegatePathPrefixes.List() {
if strings.HasPrefix(prefix, testPrefix) {
return true
}
}
return false
}

0 comments on commit b8ff23a

Please sign in to comment.