Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add deregistration for paths #44399

Merged
merged 1 commit into from
Apr 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 63 additions & 10 deletions staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ import (
"net/http"
"runtime/debug"
"sort"
"sync"
"sync/atomic"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

// PathRecorderMux wraps a mux object and records the registered exposedPaths. It is _not_ go routine safe.
// PathRecorderMux wraps a mux object and records the registered exposedPaths.
type PathRecorderMux struct {
mux *http.ServeMux
lock sync.Mutex
pathToHandler map[string]http.Handler

// mux stores an *http.ServeMux and is used to handle the actual serving
mux atomic.Value

// exposedPaths is the list of paths that should be shown at /
exposedPaths []string

// pathStacks holds the stacks of all registered paths. This allows us to show a more helpful message
Expand All @@ -37,10 +45,15 @@ type PathRecorderMux struct {

// NewPathRecorderMux creates a new PathRecorderMux with the given mux as the base mux.
func NewPathRecorderMux() *PathRecorderMux {
return &PathRecorderMux{
mux: http.NewServeMux(),
pathStacks: map[string]string{},
ret := &PathRecorderMux{
pathToHandler: map[string]http.Handler{},
mux: atomic.Value{},
exposedPaths: []string{},
pathStacks: map[string]string{},
}

ret.mux.Store(http.NewServeMux())
return ret
}

// ListedPaths returns the registered handler exposedPaths.
Expand All @@ -58,41 +71,81 @@ func (m *PathRecorderMux) trackCallers(path string) {
m.pathStacks[path] = string(debug.Stack())
}

// refreshMuxLocked creates a new mux and must be called while locked. Otherwise the view of handlers may
// not be consistent
func (m *PathRecorderMux) refreshMuxLocked() {
mux := http.NewServeMux()
for path, handler := range m.pathToHandler {
mux.Handle(path, handler)
}

m.mux.Store(mux)
}

// Unregister removes a path from the mux.
func (m *PathRecorderMux) Unregister(path string) {
m.lock.Lock()
defer m.lock.Unlock()

delete(m.pathToHandler, path)
delete(m.pathStacks, path)
for i := range m.exposedPaths {
if m.exposedPaths[i] == path {
m.exposedPaths = append(m.exposedPaths[:i], m.exposedPaths[i+1:]...)
break
}
}

m.refreshMuxLocked()
}

// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) Handle(path string, handler http.Handler) {
m.lock.Lock()
defer m.lock.Unlock()
m.trackCallers(path)

m.exposedPaths = append(m.exposedPaths, path)
m.mux.Handle(path, handler)
m.pathToHandler[path] = handler
m.refreshMuxLocked()
}

// HandleFunc registers the handler function for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) HandleFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
m.lock.Lock()
defer m.lock.Unlock()
m.trackCallers(path)

m.exposedPaths = append(m.exposedPaths, path)
m.mux.HandleFunc(path, handler)
m.pathToHandler[path] = http.HandlerFunc(handler)
m.refreshMuxLocked()
}

// UnlistedHandle registers the handler for the given pattern, but doesn't list it.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) UnlistedHandle(path string, handler http.Handler) {
m.lock.Lock()
defer m.lock.Unlock()
m.trackCallers(path)

m.mux.Handle(path, handler)
m.pathToHandler[path] = handler
m.refreshMuxLocked()
}

// UnlistedHandleFunc registers the handler function for the given pattern, but doesn't list it.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) UnlistedHandleFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
m.lock.Lock()
defer m.lock.Unlock()
m.trackCallers(path)

m.mux.HandleFunc(path, handler)
m.pathToHandler[path] = http.HandlerFunc(handler)
m.refreshMuxLocked()
}

// ServeHTTP makes it an http.Handler
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.ServeHTTP(w, r)
m.mux.Load().(*http.ServeMux).ServeHTTP(w, r)
}
37 changes: 37 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mux

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -30,3 +31,39 @@ func TestSecretHandlers(t *testing.T) {
assert.NotContains(t, c.ListedPaths(), "/secret")
assert.Contains(t, c.ListedPaths(), "/nonswagger")
}

func TestUnregisterHandlers(t *testing.T) {
first := 0
second := 0

c := NewPathRecorderMux()
s := httptest.NewServer(c)
defer s.Close()

c.UnlistedHandleFunc("/secret", func(http.ResponseWriter, *http.Request) {})
c.HandleFunc("/nonswagger", func(http.ResponseWriter, *http.Request) {
first = first + 1
})
assert.NotContains(t, c.ListedPaths(), "/secret")
assert.Contains(t, c.ListedPaths(), "/nonswagger")

resp, _ := http.Get(s.URL + "/nonswagger")
assert.Equal(t, first, 1)
assert.Equal(t, resp.StatusCode, http.StatusOK)

c.Unregister("/nonswagger")
assert.NotContains(t, c.ListedPaths(), "/nonswagger")

resp, _ = http.Get(s.URL + "/nonswagger")
assert.Equal(t, first, 1)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)

c.HandleFunc("/nonswagger", func(http.ResponseWriter, *http.Request) {
second = second + 1
})
assert.Contains(t, c.ListedPaths(), "/nonswagger")
resp, _ = http.Get(s.URL + "/nonswagger")
assert.Equal(t, first, 1)
assert.Equal(t, second, 1)
assert.Equal(t, resp.StatusCode, http.StatusOK)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package apiregistration
import (
"sort"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
)

func SortedByGroup(servers []*APIService) [][]*APIService {
Expand Down Expand Up @@ -57,3 +59,10 @@ func (s ByPriority) Less(i, j int) bool {
}
return s[i].Spec.Priority < s[j].Spec.Priority
}

// APIServiceNameToGroupVersion returns the GroupVersion for a given apiServiceName. The name
// must be valid, but any object you get back from an informer will be valid.
func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion {
tokens := strings.SplitN(apiServiceName, ".", 2)
return schema.GroupVersion{Group: tokens[1], Version: tokens[0]}
}
30 changes: 19 additions & 11 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
}
proxyHandler.updateAPIService(apiService, destinationHost)
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath+"/", proxyHandler)
s.GenericAPIServer.FallThroughHandler.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.FallThroughHandler.UnlistedHandle(proxyPath+"/", proxyHandler)

// if we're dealing with the legacy group, we're done here
if apiService.Name == legacyAPIServiceName {
Expand All @@ -241,20 +241,28 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
lister: s.lister,
serviceLister: s.serviceLister,
endpointsLister: s.endpointsLister,
delegate: s.GenericAPIServer.FallThroughHandler,
delegate: s.delegateHandler,
}
// aggregation is protected
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(groupPath+"/", groupDiscoveryHandler)
s.GenericAPIServer.FallThroughHandler.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.FallThroughHandler.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
s.handledGroups.Insert(apiService.Spec.Group)
}

// RemoveAPIService removes the APIService from being handled. Later on it will disable the proxy endpoint.
// Right now it does nothing because our handler has to properly 404 itself since muxes don't unregister
// RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread.
func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
proxyHandler, exists := s.proxyHandlers[apiServiceName]
if !exists {
return
version := apiregistration.APIServiceNameToGroupVersion(apiServiceName)

proxyPath := "/apis/" + version.Group + "/" + version.Version
// v1. is a special case for the legacy API. It proxies to a wider set of endpoints.
if apiServiceName == legacyAPIServiceName {
proxyPath = "/api"
}
proxyHandler.removeAPIService()
s.GenericAPIServer.FallThroughHandler.Unregister(proxyPath)
s.GenericAPIServer.FallThroughHandler.Unregister(proxyPath + "/")
delete(s.proxyHandlers, apiServiceName)

// TODO unregister group level discovery when there are no more versions for the group
// We don't need this right away because the handler properly delegates when no versions are present
}
23 changes: 1 addition & 22 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package apiserver

import (
"net/http"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,24 +29,9 @@ import (

apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
apiregistrationv1alpha1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
)

// WithAPIs adds the handling for /apis and /apis/<group: -apiregistration.k8s.io>.
func WithAPIs(handler http.Handler, codecs serializer.CodecFactory, informer informers.APIServiceInformer, serviceLister v1listers.ServiceLister, endpointsLister v1listers.EndpointsLister) http.Handler {
apisHandler := &apisHandler{
codecs: codecs,
lister: informer.Lister(),
delegate: handler,
serviceLister: serviceLister,
endpointsLister: endpointsLister,
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
apisHandler.ServeHTTP(w, req)
})
}

// apisHandler serves the `/apis` endpoint.
// This is registered as a filter so that it never collides with any explictly registered endpoints
type apisHandler struct {
Expand Down Expand Up @@ -75,11 +59,6 @@ var discoveryGroup = metav1.APIGroup{
}

func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// if the URL is for OUR api group, serve it normally
if strings.HasPrefix(req.URL.Path+"/", "/apis/"+apiregistrationapi.GroupName+"/") {
r.delegate.ServeHTTP(w, req)
return
}
// don't handle URLs that aren't /apis
if req.URL.Path != "/apis" && req.URL.Path != "/apis/" {
r.delegate.ServeHTTP(w, req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but I just noticed we lost content-type: application/json on the following URLs:

  • https://<server>/apis
  • https://<server>/apis/
  • https://<server>/apis/extensions
  • https://<server>/apis/extensions/

Oddly, the same urls against the unsecured port still return content-type: application/json. I didn't expect the handler (once you got through the outer filters) to be different.

Expand Down Expand Up @@ -210,7 +189,7 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

if len(apiServicesForGroup) == 0 {
http.Error(w, "", http.StatusNotFound)
r.delegate.ServeHTTP(w, req)
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,13 @@ func TestAPIGroupMissing(t *testing.T) {
t.Fatalf("expected %v, got %v", http.StatusForbidden, resp.StatusCode)
}

// groupName still has no api services for it (like it was deleted), it should 404
// groupName still has no api services for it (like it was deleted), it should delegate
resp, err = http.Get(server.URL + "/apis/groupName/")
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("expected %v, got %v", http.StatusNotFound, resp.StatusCode)
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("expected %v, got %v", http.StatusForbidden, resp.StatusCode)
}

// missing group should delegate still has no api services for it (like it was deleted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ type proxyHandlingInfo struct {
}

func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handlingInfo := r.handlingInfo.Load().(proxyHandlingInfo)
value := r.handlingInfo.Load()
if value == nil {
r.localDelegate.ServeHTTP(w, req)
return
}
handlingInfo := value.(proxyHandlingInfo)
if handlingInfo.local {
r.localDelegate.ServeHTTP(w, req)
return
Expand Down Expand Up @@ -197,7 +202,3 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
r.handlingInfo.Store(newInfo)
}

func (r *proxyHandler) removeAPIService() {
r.handlingInfo.Store(proxyHandlingInfo{})
}