Skip to content

Commit

Permalink
Merge pull request #77816 from liggitt/graceful-crd
Browse files Browse the repository at this point in the history
 Graceful custom resource storage teardown
  • Loading branch information
k8s-ci-robot committed May 14, 2019
2 parents 7446929 + ee215ba commit c85c0e4
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
Expand All @@ -68,6 +70,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers"
Expand All @@ -62,6 +64,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
Expand Down Expand Up @@ -100,6 +103,9 @@ type crdHandler struct {

// so that we can do create on update.
authorizer authorizer.Authorizer

// request timeout we should delay storage teardown for
requestTimeout time.Duration
}

// crdInfo stores enough information to serve the storage for the custom resource
Expand All @@ -123,6 +129,8 @@ type crdInfo struct {

// storageVersion is the CRD version used when storing the object in etcd.
storageVersion string

waitGroup *utilwaitgroup.SafeWaitGroup
}

// crdStorageMap goes from customresourcedefinition to its storage
Expand All @@ -139,7 +147,8 @@ func NewCustomResourceDefinitionHandler(
serviceResolver webhook.ServiceResolver,
authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
masterCount int,
authorizer authorizer.Authorizer) (*crdHandler, error) {
authorizer authorizer.Authorizer,
requestTimeout time.Duration) (*crdHandler, error) {
ret := &crdHandler{
versionDiscoveryHandler: versionDiscoveryHandler,
groupDiscoveryHandler: groupDiscoveryHandler,
Expand All @@ -151,6 +160,7 @@ func NewCustomResourceDefinitionHandler(
establishingController: establishingController,
masterCount: masterCount,
authorizer: authorizer,
requestTimeout: requestTimeout,
}
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: ret.updateCustomResourceDefinition,
Expand All @@ -169,6 +179,11 @@ func NewCustomResourceDefinitionHandler(
return ret, nil
}

// watches are expected to handle storage disruption gracefully,
// both on the server-side (by terminating the watch connection)
// and on the client side (by restarting the watch)
var longRunningFilter = genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString())

func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
Expand Down Expand Up @@ -238,7 +253,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
supportedTypes = append(supportedTypes, string(types.ApplyPatchType))
}

var handler http.HandlerFunc
var handlerFunc http.HandlerFunc
subresources, err := apiextensions.GetSubresourcesForVersion(crd, requestInfo.APIVersion)
if err != nil {
utilruntime.HandleError(err)
Expand All @@ -247,18 +262,19 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
switch {
case subresource == "status" && subresources != nil && subresources.Status != nil:
handler = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case subresource == "scale" && subresources != nil && subresources.Scale != nil:
handler = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case len(subresource) == 0:
handler = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
default:
http.Error(w, "the server could not find the requested resource", http.StatusNotFound)
}

if handler != nil {
handler = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handler)
handler(w, req)
if handlerFunc != nil {
handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc)
handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup)
handler.ServeHTTP(w, req)
return
}
}
Expand Down Expand Up @@ -365,18 +381,18 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})

klog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)

// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()
if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok {
for _, storage := range oldInfo.storages {
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
storage.CustomResource.DestroyFunc()
}
if oldInfo, ok := storageMap[types.UID(oldCRD.UID)]; ok {
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()

// Remove from the CRD info map and store the map
delete(storageMap2, types.UID(oldCRD.UID))
}
r.customStorage.Store(storageMap2)

r.customStorage.Store(storageMap2)
// Tear down the old storage
go r.tearDown(oldInfo)
}
}

// removeDeadStorage removes REST storage that isn't being used
Expand All @@ -390,6 +406,7 @@ func (r *crdHandler) removeDeadStorage() {
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()

oldInfos := []*crdInfo{}
storageMap := r.customStorage.Load().(crdStorageMap)
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere
Expand All @@ -404,14 +421,38 @@ func (r *crdHandler) removeDeadStorage() {
}
if !found {
klog.V(4).Infof("Removing dead CRD storage for %s/%s", s.spec.Group, s.spec.Names.Kind)
for _, storage := range s.storages {
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
storage.CustomResource.DestroyFunc()
}
oldInfos = append(oldInfos, s)
delete(storageMap2, uid)
}
}
r.customStorage.Store(storageMap2)

for _, s := range oldInfos {
go r.tearDown(s)
}
}

// Wait up to a minute for requests to drain, then tear down storage
func (r *crdHandler) tearDown(oldInfo *crdInfo) {
requestsDrained := make(chan struct{})
go func() {
defer close(requestsDrained)
// Allow time for in-flight requests with a handle to the old info to register themselves
time.Sleep(time.Second)
// Wait for in-flight requests to drain
oldInfo.waitGroup.Wait()
}()

select {
case <-time.After(r.requestTimeout * 2):
klog.Warningf("timeout waiting for requests to drain for %s/%s, tearing down storage", oldInfo.spec.Group, oldInfo.spec.Names.Kind)
case <-requestsDrained:
}

for _, storage := range oldInfo.storages {
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
storage.CustomResource.DestroyFunc()
}
}

// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of
Expand Down Expand Up @@ -622,6 +663,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource
scaleRequestScopes: scaleScopes,
statusRequestScopes: statusScopes,
storageVersion: storageVersion,
waitGroup: &utilwaitgroup.SafeWaitGroup{},
}

// Copy because we cannot write to storageMap without a race
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_test(
srcs = [
"apply_test.go",
"basic_test.go",
"change_test.go",
"finalization_test.go",
"objectmeta_test.go",
"registration_test.go",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"fmt"
"sync"
"testing"
"time"

apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
)

func TestChangeCRD(t *testing.T) {
tearDown, config, _, err := fixtures.StartDefaultServer(t)
if err != nil {
t.Fatal(err)
}
defer tearDown()
config.QPS = 1000
config.Burst = 1000
apiExtensionsClient, err := clientset.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}

noxuDefinition := fixtures.NewNoxuCustomResourceDefinition(apiextensionsv1beta1.NamespaceScoped)
noxuDefinition, err = fixtures.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionsClient, dynamicClient)
if err != nil {
t.Fatal(err)
}

ns := "default"
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")

stopChan := make(chan struct{})

wg := &sync.WaitGroup{}

// Set up loop to modify CRD in the background
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopChan:
return
default:
}

noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(noxuDefinition.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
v2.Name = "v2"
v2.Served = true
v2.Storage = false
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
} else {
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
}
if _, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(noxuDefinitionToUpdate); err != nil && !apierrors.IsConflict(err) {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}
}()

// Set up 100 loops creating and reading custom resources
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
noxuInstanceToCreate := fixtures.NewNoxuInstance(ns, fmt.Sprintf("foo-%d", i))
if _, err := noxuNamespacedResourceClient.Create(noxuInstanceToCreate, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
for {
select {
case <-stopChan:
return
default:
if _, err := noxuNamespacedResourceClient.Get(noxuInstanceToCreate.GetName(), metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
}
time.Sleep(10 * time.Millisecond)
}
}(i)
}

// Let all the established get request loops soak
time.Sleep(5 * time.Second)

// Tear down
close(stopChan)

// Let loops drain
wg.Wait()
}

0 comments on commit c85c0e4

Please sign in to comment.