Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiserver: add callback to get notified of object count #103700

Merged
merged 1 commit into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
Expand Down Expand Up @@ -1134,23 +1135,25 @@ func (d unstructuredDefaulter) Default(in runtime.Object) {
}

type CRDRESTOptionsGetter struct {
StorageConfig storagebackend.Config
StoragePrefix string
EnableWatchCache bool
DefaultWatchCacheSize int
EnableGarbageCollection bool
DeleteCollectionWorkers int
CountMetricPollPeriod time.Duration
StorageConfig storagebackend.Config
StoragePrefix string
EnableWatchCache bool
DefaultWatchCacheSize int
EnableGarbageCollection bool
DeleteCollectionWorkers int
CountMetricPollPeriod time.Duration
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}

func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &t.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: t.EnableGarbageCollection,
DeleteCollectionWorkers: t.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: t.CountMetricPollPeriod,
StorageConfig: &t.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: t.EnableGarbageCollection,
DeleteCollectionWorkers: t.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: t.CountMetricPollPeriod,
StorageObjectCountTracker: t.StorageObjectCountTracker,
}
if t.EnableWatchCache {
ret.Decorator = genericregistry.StorageWithCacher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
// NewCRDRESTOptionsGetter create a RESTOptionsGetter for CustomResources.
func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregistry.RESTOptionsGetter {
ret := apiserver.CRDRESTOptionsGetter{
StorageConfig: etcdOptions.StorageConfig,
StoragePrefix: etcdOptions.StorageConfig.Prefix,
EnableWatchCache: etcdOptions.EnableWatchCache,
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
EnableGarbageCollection: etcdOptions.EnableGarbageCollection,
DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers,
CountMetricPollPeriod: etcdOptions.StorageConfig.CountMetricPollPeriod,
StorageConfig: etcdOptions.StorageConfig,
StoragePrefix: etcdOptions.StorageConfig.Prefix,
EnableWatchCache: etcdOptions.EnableWatchCache,
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
EnableGarbageCollection: etcdOptions.EnableGarbageCollection,
DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers,
CountMetricPollPeriod: etcdOptions.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: etcdOptions.StorageConfig.StorageObjectCountTracker,
}
ret.StorageConfig.Codec = unstructured.UnstructuredJSONScheme

Expand Down
10 changes: 6 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/registry/generic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -30,10 +31,11 @@ type RESTOptions struct {
StorageConfig *storagebackend.Config
Decorator StorageDecorator

EnableGarbageCollection bool
DeleteCollectionWorkers int
ResourcePrefix string
CountMetricPollPeriod time.Duration
EnableGarbageCollection bool
DeleteCollectionWorkers int
ResourcePrefix string
CountMetricPollPeriod time.Duration
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}

// Implement RESTOptionsGetter so that RESTOptions can directly be used when available (i.e. tests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/util/dryrun"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"

Expand Down Expand Up @@ -1413,7 +1414,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
e.StorageVersioner = opts.StorageConfig.EncodeVersioner

if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
Expand All @@ -1428,7 +1429,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
}

// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
func (e *Store) startObservingCount(period time.Duration) func() {
func (e *Store) startObservingCount(period time.Duration, objectCountTracker flowcontrolrequest.StorageObjectCountTracker) func() {
prefix := e.KeyRootFunc(genericapirequest.NewContext())
resourceName := e.DefaultQualifiedResource.String()
klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "<storage-prefix>/"+prefix)
Expand All @@ -1437,9 +1438,12 @@ func (e *Store) startObservingCount(period time.Duration) func() {
count, err := e.Storage.Count(prefix)
if err != nil {
klog.V(5).InfoS("Failed to update storage count metric", "err", err)
metrics.UpdateObjectCount(resourceName, -1)
} else {
metrics.UpdateObjectCount(resourceName, count)
count = -1
}

metrics.UpdateObjectCount(resourceName, count)
if objectCountTracker != nil {
objectCountTracker.Set(resourceName, count)
}
}, period, resourceCountPollPeriodJitter, true, stopCh)
return func() { close(stopCh) }
Expand Down
11 changes: 9 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ type Config struct {
// it's intentionally marked private as it should never be overridden.
lifecycleSignals lifecycleSignals

// StorageObjectCountTracker is used to keep track of the total number of objects
// in the storage per resource, so we can estimate width of incoming requests.
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker

//===========================================================================
// values below here are targets for removal
//===========================================================================
Expand Down Expand Up @@ -312,6 +316,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
if feature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
id = "kube-apiserver-" + uuid.New().String()
}
lifecycleSignals := newLifecycleSignals()

return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
Expand Down Expand Up @@ -349,8 +355,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config {

// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: newLifecycleSignals(),
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: lifecycleSignals,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(lifecycleSignals.ShutdownInitiated.Signaled()),

APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),
Expand Down
33 changes: 21 additions & 12 deletions staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
}
}

// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker

c.RESTOptionsGetter = &SimpleRestOptionsFactory{
Options: *s,
TransformerOverrides: transformerOverrides,
Expand All @@ -217,6 +220,10 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}

// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker

c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
Expand Down Expand Up @@ -248,12 +255,13 @@ type SimpleRestOptionsFactory struct {

func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.TransformerOverrides != nil {
if transformer, ok := f.TransformerOverrides[resource]; ok {
Expand Down Expand Up @@ -290,12 +298,13 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
}

ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
)

const (
Expand Down Expand Up @@ -84,6 +85,10 @@ type Config struct {
HealthcheckTimeout time.Duration

LeaseManagerConfig etcd3.LeaseManagerConfig

// StorageObjectCountTracker is used to keep track of the total
// number of objects in the storage per resource.
StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}

func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
Expand Down