Skip to content

Commit

Permalink
Use active teardown count to determine when to requeue an update
Browse files Browse the repository at this point in the history
  • Loading branch information
richabanker committed Apr 6, 2024
1 parent c0ed2bc commit c0f169e
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 708 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/server.go
Expand Up @@ -185,7 +185,7 @@ func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregat
}

// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.CRDInformers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
Expand Down
Expand Up @@ -51,9 +51,14 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)

const (
storageversionUpdateWorkers = 5
)

var (
Scheme = runtime.NewScheme()
Codecs = serializer.NewCodecFactory(Scheme)
Expand Down Expand Up @@ -111,7 +116,9 @@ type CustomResourceDefinitions struct {
GenericAPIServer *genericapiserver.GenericAPIServer

// provided for easier embedding
Informers externalinformers.SharedInformerFactory
CRDInformers externalinformers.SharedInformerFactory

StorageVersionInformers informers.SharedInformerFactory
}

// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
Expand Down Expand Up @@ -176,7 +183,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// we need to be able to move forward
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
s.CRDInformers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)

kubeclientset, err := kubernetes.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset for storage versions: %w", err)
}
s.StorageVersionInformers = informers.NewSharedInformerFactory(kubeclientset, time.Second)

delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
Expand All @@ -191,23 +204,22 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
establishingController := establish.NewEstablishingController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())

crdInformer := s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions()
svInformer := s.StorageVersionInformers.Internal().V1alpha1().StorageVersions()

var storageVersionManager *storageversion.Manager
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
kubeclientset, err := kubernetes.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset for storage versions: %w", err)
}
sc := kubeclientset.InternalV1alpha1().StorageVersions()
storageVersionManager = storageversion.NewManager(sc, c.GenericConfig.APIServerID)
storageVersionManager = storageversion.NewManager(sc, c.GenericConfig.APIServerID, crdInformer, svInformer)
}

crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
crdInformer,
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
Expand All @@ -233,18 +245,22 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
if aggregatedDiscoveryManager != nil {
aggregatedDiscoveryManager = aggregatedDiscoveryManager.WithSource(aggregated.CRDSource)
}
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, aggregatedDiscoveryManager)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
discoveryController := NewDiscoveryController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, aggregatedDiscoveryManager)
namingController := status.NewNamingConditionController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions(),
crdClient.ApiextensionsV1(),
crdHandler,
)

s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
s.CRDInformers.Start(context.StopCh)
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
s.StorageVersionInformers.Start(context.StopCh)
}
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
Expand All @@ -254,12 +270,12 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
if s.GenericAPIServer.StaticOpenAPISpec != nil {
if s.GenericAPIServer.OpenAPIVersionedService != nil {
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
openapiController := openapicontroller.NewController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions())
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}

if s.GenericAPIServer.OpenAPIV3VersionedService != nil {
openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
openapiv3Controller := openapiv3controller.NewController(s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions())
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}
Expand All @@ -272,9 +288,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)

if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
// this goroutine will just monitor when its time for the storageversion
// manager to shutdown its queues.
go crdHandler.storageVersionManager.Shutdown(context.StopCh)
go crdHandler.storageVersionManager.Sync(context.StopCh, storageversionUpdateWorkers)
}

discoverySyncedCh := make(chan struct{})
Expand All @@ -291,17 +305,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// but we won't go healthy until we can handle the ones already present.
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
if !s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
return false, nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
if !crdHandler.storageVersionManager.SyncSVOnStartup(s.Informers.Apiextensions().V1().CustomResourceDefinitions()) {
return false, nil
}
if s.CRDInformers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
close(hasCRDInformerSyncedSignal)
return true, nil
}
close(hasCRDInformerSyncedSignal)
return true, nil
return false, nil
}, context.StopCh)
})

Expand Down

0 comments on commit c0f169e

Please sign in to comment.