Skip to content

Commit

Permalink
Merge pull request #14052 from sttts/sttts-tri-state-watch-cache-size
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot committed May 23, 2017
2 parents 48e5e40 + d0797da commit c02131d
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 72 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/server/kubernetes/master/master_config.go
Expand Up @@ -70,6 +70,8 @@ import (
"github.com/openshift/origin/pkg/version"
)

const DefaultWatchCacheSize = 1000

// request paths that match this regular expression will be treated as long running
// and not subjected to the default server timeout.
const originLongRunningEndpointsRE = "(/|^)(buildconfigs/.*/instantiatebinary|imagestreamimports)$"
Expand Down Expand Up @@ -150,6 +152,7 @@ func BuildKubeAPIserverOptions(masterConfig configapi.MasterConfig) (*kapiserver
server.Etcd.StorageConfig.KeyFile = masterConfig.EtcdClientInfo.ClientCert.KeyFile
server.Etcd.StorageConfig.CertFile = masterConfig.EtcdClientInfo.ClientCert.CertFile
server.Etcd.StorageConfig.CAFile = masterConfig.EtcdClientInfo.CA
server.Etcd.DefaultWatchCacheSize = DefaultWatchCacheSize

server.GenericServerRunOptions.MaxRequestsInFlight = masterConfig.ServingInfo.MaxRequestsInFlight
server.GenericServerRunOptions.MinRequestTimeout = masterConfig.ServingInfo.RequestTimeoutSeconds
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/server/kubernetes/master/master_config_test.go
Expand Up @@ -78,6 +78,7 @@ func TestAPIServerDefaults(t *testing.T) {
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableWatchCache: true,
DefaultWatchCacheSize: 100,
},
SecureServing: &apiserveroptions.SecureServingOptions{
ServingOptions: apiserveroptions.ServingOptions{
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/server/kubernetes/master/master_test.go
Expand Up @@ -25,7 +25,8 @@ func TestNewMasterLeasesHasCorrectTTL(t *testing.T) {
}

restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, 0, nil, "masterleases", nil, nil, nil, nil)
watchCacheDisabled := 0
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, &watchCacheDisabled, nil, "masterleases", nil, nil, nil, nil)
defer server.Terminate(t)

masterLeases := newMasterLeases(storageInterface)
Expand Down
21 changes: 10 additions & 11 deletions pkg/util/restoptions/configgetter.go
Expand Up @@ -21,9 +21,6 @@ import (
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
)

// UseConfiguredCacheSize indicates that the configured cache size should be used
const UseConfiguredCacheSize = -1

// configRESTOptionsGetter provides RESTOptions based on a provided config
type configRESTOptionsGetter struct {
masterOptions configapi.MasterConfig
Expand Down Expand Up @@ -84,7 +81,7 @@ func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig
return &configRESTOptionsGetter{
masterOptions: masterOptions,
cacheEnabled: apiserverOptions.Etcd.EnableWatchCache,
defaultCacheSize: 1000,
defaultCacheSize: apiserverOptions.Etcd.DefaultWatchCacheSize,
cacheSizes: cacheSizes,
restOptionsMap: map[schema.GroupResource]generic.RESTOptions{},
defaultResourceConfig: defaultResourceConfig,
Expand Down Expand Up @@ -115,30 +112,32 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
if !specified || configuredCacheSize < 0 {
configuredCacheSize = g.defaultCacheSize
}
storageWithCacher := registry.StorageWithCacher(configuredCacheSize)

decorator := func(
copier runtime.ObjectCopier,
storageConfig *storagebackend.Config,
requestedSize int,
requestedSize *int,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFn func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFn storage.TriggerPublisherFunc,
) (storage.Interface, factory.DestroyFunc) {
capacity := requestedSize
if capacity == UseConfiguredCacheSize {
capacity = configuredCacheSize
// use the origin default cache size, not the one in registry.StorageWithCacher
capacity := &configuredCacheSize
if requestedSize != nil {
capacity = requestedSize
}

if capacity == 0 || !g.cacheEnabled {
if *capacity == 0 || !g.cacheEnabled {
glog.V(5).Infof("using uncached watch storage for %s", resource.String())
return generic.UndecoratedStorage(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
}

glog.V(5).Infof("using watch cache storage (capacity=%d) for %s %#v", capacity, resource.String(), storageConfig)
return registry.StorageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
glog.V(5).Infof("using watch cache storage (capacity=%v) for %s %#v", *capacity, resource.String(), storageConfig)
return storageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
}

resourceOptions := generic.RESTOptions{
Expand Down
7 changes: 3 additions & 4 deletions test/extended/util/test.go
Expand Up @@ -293,10 +293,9 @@ var (
`\[Feature:Downgrade\]`,

// upstream flakes
`should provide basic identity`, // Basic StatefulSet functionality
`Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed`, // Basic StatefulSet functionality
`validates resource limits of pods that are allowed to run`, // SchedulerPredicates
`should idle the service and DeploymentConfig properly`, // idling with a single service and DeploymentConfig [Conformance]
`should provide basic identity`, // Basic StatefulSet functionality
`validates resource limits of pods that are allowed to run`, // SchedulerPredicates
`should idle the service and DeploymentConfig properly`, // idling with a single service and DeploymentConfig [Conformance]
}
excludedTestsFilter = regexp.MustCompile(strings.Join(excludedTests, `|`))

Expand Down
148 changes: 148 additions & 0 deletions test/integration/watch_cache_test.go
@@ -0,0 +1,148 @@
package integration

import (
"fmt"
"strconv"
"strings"
"testing"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"

configapi "github.com/openshift/origin/pkg/cmd/server/api"
serverkube "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
testutil "github.com/openshift/origin/test/util"
testserver "github.com/openshift/origin/test/util/server"
)

func testWatchCacheWithConfig(t *testing.T, master *configapi.MasterConfig, expectedCacheSize, counterExampleCacheSize int) {
clusterAdminKubeConfig, err := testserver.StartConfiguredMasterAPI(master)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

client, err := testutil.GetClusterAdminKubeClient(clusterAdminKubeConfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// create client with high burst value, otherwise we can only do 5 changes per second
config, err := testutil.GetClusterAdminClientConfig(clusterAdminKubeConfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
config.Burst = expectedCacheSize
patchClient, err := coreclient.NewForConfig(config)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// modify labels of default namespace expectedCacheSize + 1 times
defaultNS, err := client.Core().Namespaces().Get("default", metav1.GetOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
startVersion, err := strconv.Atoi(defaultNS.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
for i := 0; i < expectedCacheSize+1; i++ {
for r := 0; r < 10; r++ {
defaultNS, err = patchClient.Namespaces().Patch("default", types.StrategicMergePatchType,
[]byte(fmt.Sprintf(`{"metadata":{"labels":{"test":"%d"}}}`, i)))
if err != nil && !kerrors.IsConflict(err) {
t.Fatalf("unexpected patch error: %v", err)
}
if err == nil {
break
}
}
if err != nil {
t.Fatalf("too many retries: %v", err)
}
}

// do a versioned GET because it force the cache to sync
_, err = client.Core().Namespaces().Get("default", metav1.GetOptions{ResourceVersion: defaultNS.ResourceVersion})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// try watch with very old resource version, not really expectedCacheSize versions back (there
// might be other namespace changes which push the default namespace versions out of the cache.
// Also note that the resource versions are global in etcd. So other resources will also lead
// to resource version jumps.
lastVersion, err := strconv.Atoi(defaultNS.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error converting the resource version: %v", err)
}
w, err := client.Core().Namespaces().Watch(metav1.ListOptions{ResourceVersion: strconv.Itoa(lastVersion - (expectedCacheSize-counterExampleCacheSize)/2)})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer w.Stop()
ev := <-w.ResultChan()
if ev.Type == watch.Error {
t.Fatalf("unexpected event of error type: %v", ev)
}

// try watch with an version that is too old
w, err = client.Core().Namespaces().Watch(metav1.ListOptions{ResourceVersion: strconv.Itoa(startVersion - 1)})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer w.Stop()

// the first event will be of error type
goneErrMsg := "too old resource version"
ev = <-w.ResultChan()
if ev.Type != watch.Error {
t.Fatalf("expected an %q error as first event, got: %v", goneErrMsg, ev)
}
status, ok := ev.Object.(*metav1.Status)
if !ok {
t.Fatalf("expected a metav1.Status object in first event, got: %v", ev.Object)
}
if !strings.Contains(status.Message, goneErrMsg) {
t.Fatalf("expected an %q error, got: %v", goneErrMsg, err)
}
}

func TestDefaultWatchCacheSize(t *testing.T) {
testutil.RequireEtcd(t)
defer testutil.DumpEtcdOnFailure(t)

master, err := testserver.DefaultMasterOptions()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// test that the origin default really applies and that we don't fall back to kube's default
etcdOptions := apiserveroptions.NewEtcdOptions(&storagebackend.Config{})
kubeDefaultCacheSize := etcdOptions.DefaultWatchCacheSize
if kubeDefaultCacheSize != 100 {
t.Fatalf("upstream DefaultWatchCacheSize changed from 100 to %q", kubeDefaultCacheSize)
}
testWatchCacheWithConfig(t, master, serverkube.DefaultWatchCacheSize, kubeDefaultCacheSize)
}

func TestWatchCacheSizeWithFlag(t *testing.T) {
testutil.RequireEtcd(t)
defer testutil.DumpEtcdOnFailure(t)

master, err := testserver.DefaultMasterOptions()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if master.KubernetesMasterConfig.APIServerArguments == nil {
master.KubernetesMasterConfig.APIServerArguments = configapi.ExtendedArguments{}
}
master.KubernetesMasterConfig.APIServerArguments["watch-cache-sizes"] = []string{"namespaces#2000"}

testWatchCacheWithConfig(t, master, 2000, serverkube.DefaultWatchCacheSize)
}
10 changes: 5 additions & 5 deletions vendor/k8s.io/kubernetes/pkg/registry/cachesize/cachesize.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c02131d

Please sign in to comment.