Skip to content

Commit

Permalink
UPSTREAM: <carry>: merge v3 openapi discovery and specs for special g…
Browse files Browse the repository at this point in the history
…roups

that have kinds that are served by both CRDs
and external apiservers (eg openshift-apiserver)

this includes:
- authorization.openshift.io (rolebindingrestrictions served by a CRD)
- security.openshift.io (securitycontextconstraints served by a CRD)
- quota.openshift.io (clusterresourcequotas served by a CRD)

By merging all sources, we ensure that kinds served by a CRD will have
openapi discovery and spec available even when openshift-apiserver is
unavailable.
  • Loading branch information
atiratree authored and dinhxuanvu committed Apr 26, 2024
1 parent 8ea050d commit efc3c70
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 9 deletions.
Expand Up @@ -238,20 +238,18 @@ func (s *specProxier) getOpenAPIV3Root() handler3.OpenAPIV3Discovery {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()

merged := handler3.OpenAPIV3Discovery{
Paths: make(map[string]handler3.OpenAPIV3DiscoveryGroupVersion),
}
paths := make(map[string][]handler3.OpenAPIV3DiscoveryGroupVersion)

for _, apiServiceInfo := range s.apiServiceInfo {
if apiServiceInfo.discovery == nil {
continue
}

for key, item := range apiServiceInfo.discovery.Paths {
merged.Paths[key] = item
paths[key] = append(paths[key], item)
}
}
return merged
return mergeOpenAPIV3RootPaths(paths)
}

// handleDiscovery is the handler for OpenAPI V3 Discovery
Expand All @@ -278,18 +276,33 @@ func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request)
url := strings.SplitAfterN(r.URL.Path, "/", 4)
targetGV := url[3]

var eligibleURLs []string
eligibleURLsToAPIServiceInfos := make(map[string]*openAPIV3APIServiceInfo)

for _, apiServiceInfo := range s.apiServiceInfo {
if apiServiceInfo.discovery == nil {
continue
}

for key := range apiServiceInfo.discovery.Paths {
if targetGV == key {
apiServiceInfo.handler.ServeHTTP(w, r)
return
for key, value := range apiServiceInfo.discovery.Paths {
if targetGV == key && eligibleURLsToAPIServiceInfos[value.ServerRelativeURL] == nil {
// add only apiServices that do not duplicate ServerRelativeURL (path + hash)
eligibleURLsToAPIServiceInfos[value.ServerRelativeURL] = apiServiceInfo
eligibleURLs = append(eligibleURLs, value.ServerRelativeURL)
break
}
}
if len(eligibleURLsToAPIServiceInfos) > 0 && !strings.HasPrefix(targetGV, "apis/") {
// do not search for duplicates that are not part of apis/ prefix (eg. /version)
break
}
}

if len(eligibleURLs) > 0 {
delegateAndMergeHandleGroupVersion(w, r, eligibleURLs, eligibleURLsToAPIServiceInfos)
return
}

// No group-versions match the desired request
w.WriteHeader(404)
}
Expand Down
@@ -0,0 +1,284 @@
package aggregator

import (
"bytes"
"context"
"crypto/sha512"
"encoding/json"
"fmt"
"net/http"
neturl "net/url"
"sort"
"strconv"
"strings"
"time"

"github.com/munnerz/goautoneg"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/spec3"
)

// mergeOpenAPIV3RootPaths expects mapping of openapi v3 sub url key to multiple serverRelativeURLs
// these URLs can be backed by different ApiServices or CRDs.
//
// We expect duplicates for the following groups:
// authorization.openshift.io, security.openshift.io and quota.openshift.io
// which are backed by both CRD apiextension apiserver and openshift apiserver.
func mergeOpenAPIV3RootPaths(paths map[string][]handler3.OpenAPIV3DiscoveryGroupVersion) handler3.OpenAPIV3Discovery {
merged := handler3.OpenAPIV3Discovery{
Paths: make(map[string]handler3.OpenAPIV3DiscoveryGroupVersion),
}

for key, delegationURLs := range paths {
// some apiservices can have duplicate paths in openapi v3 discovery (same path and hash) as they are backed by the same apiserver
delegationUniqueURLs := sets.List(toUniqueRelativeURLs(delegationURLs))
// we either have just one url or a special URL like a /version
if len(delegationUniqueURLs) == 1 || (len(delegationUniqueURLs) > 1 && !hasPrefix(delegationUniqueURLs, "/openapi/v3/apis/")) {
merged.Paths[key] = handler3.OpenAPIV3DiscoveryGroupVersion{
ServerRelativeURL: delegationURLs[0].ServerRelativeURL, // take first found apiServiceInfo
}
} else {
newMergedURL, err := mergeURLETags(delegationUniqueURLs)
if err != nil {
klog.Errorf("failed create merged openapi v3 URL for: %s: %s", key, err.Error())
continue
}
merged.Paths[key] = handler3.OpenAPIV3DiscoveryGroupVersion{
ServerRelativeURL: newMergedURL.String(),
}

}
}
return merged
}

// delegateAndMergeHandleGroupVersion delegates requests to eligibleURLs and merges their output
//
// We expect to delegate and merge for the following groups:
// authorization.openshift.io, security.openshift.io and quota.openshift.io
// which are backed by both CRD apiextension apiserver and openshift apiserver.
//
// The other requests will be passed to the original apiServiceInfo handler.
func delegateAndMergeHandleGroupVersion(w http.ResponseWriter, r *http.Request, eligibleURLs []string, eligibleURLsToAPIServiceInfos map[string]*openAPIV3APIServiceInfo) {
if len(eligibleURLs) == 1 {
// fully delegate to the handler
eligibleURLsToAPIServiceInfos[eligibleURLs[0]].handler.ServeHTTP(w, r)
return
} else if len(eligibleURLs) > 1 {
mergedURL, err := mergeURLETags(eligibleURLs)
if err != nil {
klog.Errorf("failed to get mergedURL: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}

if !isHashCurrent(r.URL, mergedURL.Query().Get("hash")) {
http.Redirect(w, r, mergedURL.String(), 301)
return

}
var specs []*spec3.OpenAPI
var maxLastModified time.Time

for eligibleURL, apiServiceInfo := range eligibleURLsToAPIServiceInfos {
writer := newInMemoryResponseWriter()
req, err := createNewAPIServiceRequest(r, eligibleURL)
if err != nil {
klog.Errorf("failed to create request: %s", err.Error())
continue
}
// delegate to multiple apiService handlers
apiServiceInfo.handler.ServeHTTP(writer, req)
lastModified, err := time.Parse(time.RFC1123, writer.Header().Get("Last-Modified"))
if err != nil {
klog.Warningf("not received Last-Modified in RFC1123 format: %s", err.Error())
} else if lastModified.After(maxLastModified) {
maxLastModified = lastModified
}

spec := spec3.OpenAPI{}
if err := json.Unmarshal(writer.data, &spec); err != nil {
klog.Errorf("failed to unmarshal OpenAPI for openapiService %v/%v: %s", apiServiceInfo.apiService.Namespace, apiServiceInfo.apiService.Name, err.Error())
continue
}
specs = append(specs, &spec)
}

// prefer info and version from external apiServices (will result in openshift title and description)
sort.Slice(specs, func(i, j int) bool {
if info := specs[i].Info; info != nil && strings.HasPrefix(strings.ToLower(info.Title), "kubernetes") {
return false
}
return true
})
mergedSpec, err := mergeSpecsV3(specs...)
if err != nil {
klog.Errorf("failed to merge spec: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
mergedSpecJSON, _ := json.Marshal(mergedSpec)

if maxLastModified.IsZero() {
maxLastModified = time.Now()
}

openAPIHandleGroupVersion(w, r, mergedSpecJSON, mergedURL.Query().Get("hash"), maxLastModified)
}
}

// openAPIHandleGroupVersion is mostly copied from https://github.com/kubernetes/kube-openapi/blob/3c0fae5ee9fdc4e0cb7abff6fd66784a1f0dbcf8/pkg/handler3/handler.go#L222
func openAPIHandleGroupVersion(w http.ResponseWriter, r *http.Request, data []byte, etag string, lastModified time.Time) {
const (
subTypeProtobufDeprecated = "com.github.proto-openapi.spec.v3@v1.0+protobuf"
subTypeProtobuf = "com.github.proto-openapi.spec.v3.v1.0+protobuf"
subTypeJSON = "json"
)

decipherableFormats := r.Header.Get("Accept")
if decipherableFormats == "" {
decipherableFormats = "*/*"
}
clauses := goautoneg.ParseAccept(decipherableFormats)
w.Header().Add("Vary", "Accept")

if len(clauses) == 0 {
return
}

accepted := []struct {
Type string
SubType string
ReturnedContentType string
}{
{"application", subTypeJSON, "application/" + subTypeJSON},
{"application", subTypeProtobuf, "application/" + subTypeProtobuf},
{"application", subTypeProtobufDeprecated, "application/" + subTypeProtobuf},
}

for _, clause := range clauses {
for _, accepts := range accepted {
if clause.Type != accepts.Type && clause.Type != "*" {
continue
}
if clause.SubType != accepts.SubType && clause.SubType != "*" {
continue
}

switch accepts.SubType {
case subTypeProtobuf, subTypeProtobufDeprecated:
var err error
data, err = handler3.ToV3ProtoBinary(data)
if err != nil {
klog.Errorf("failed to convert json to proto: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
// Set Content-Type header in the reponse
w.Header().Set("Content-Type", accepts.ReturnedContentType)

// ETag must be enclosed in double quotes: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
w.Header().Set("Etag", strconv.Quote(etag))

if hash := r.URL.Query().Get("hash"); hash != "" {
// validity of hash checked in handleGroupVersion with isHashCurrent

// The Vary header is required because the Accept header can
// change the contents returned. This prevents clients from caching
// protobuf as JSON and vice versa.
w.Header().Set("Vary", "Accept")

// Only set these headers when a hash is given.
w.Header().Set("Cache-Control", "public, immutable")
// Set the Expires directive to the maximum value of one year from the request,
// effectively indicating that the cache never expires.
w.Header().Set("Expires", time.Now().AddDate(1, 0, 0).Format(time.RFC1123))
}
http.ServeContent(w, r, "", lastModified, bytes.NewReader(data))
return
}
}
w.WriteHeader(406)
return
}

func toUniqueRelativeURLs(relativeURLs []handler3.OpenAPIV3DiscoveryGroupVersion) sets.Set[string] {
uniqueURLs := sets.New[string]()
for _, url := range relativeURLs {
uniqueURLs.Insert(url.ServerRelativeURL)
}
return uniqueURLs
}

func hasPrefix(urls []string, prefix string) bool {
if len(urls) == 0 {
return false
}
for _, url := range urls {
if !strings.HasPrefix(url, prefix) {
return false
}
}
return true
}

func isHashCurrent(u *neturl.URL, currentETag string) bool {
if hash := u.Query().Get("hash"); len(hash) > 0 {
// check if hash is current only if requested
return hash == currentETag
}
return true
}

// computeETag is copied from https://github.com/kubernetes/kubernetes/blob/2c6c4566eff972d6c1320b5f8ad795f88c822d09/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/etag.go#L76
func computeETag(data []byte) string {
if data == nil {
return ""
}
return fmt.Sprintf("%X", sha512.Sum512(data))
}

func mergeURLETags(delegationURLs []string) (*neturl.URL, error) {
// presume all urls are the same, so take the first one
newURL, err := neturl.Parse(delegationURLs[0])
if err != nil {
return nil, err
}
if len(delegationURLs) == 1 {
return newURL, nil
}
// sorted, for consistent hash
delegationUniqueURLs := sets.List(sets.New(delegationURLs...))
delegationUniqueURLsBytes, err := json.Marshal(delegationUniqueURLs)
if err != nil {
return nil, err
}
etag := computeETag(delegationUniqueURLsBytes)

newQuery := newURL.Query()
newQuery.Set("hash", etag)
newURL.RawQuery = newQuery.Encode()
return newURL, nil
}

func createNewAPIServiceRequest(from *http.Request, eligibleURL string) (*http.Request, error) {
req := from.Clone(request.WithUser(context.Background(), &user.DefaultInfo{Name: aggregatorUser}))
req.Header.Set("Accept", "application/json")
if hash := req.URL.Query().Get("hash"); len(hash) > 0 {
eligibleParsedURL, err := neturl.Parse(eligibleURL)
if err != nil {
return nil, err
}
// rewrite to include the latest hash for this apiservice
q := req.URL.Query()
q.Set("hash", eligibleParsedURL.Query().Get("hash"))
req.URL.RawQuery = q.Encode()
}
return req, nil
}

0 comments on commit efc3c70

Please sign in to comment.