From 85a2d45b192e97f4597c626798ae5e1865b95e6d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 5 Jun 2023 16:26:12 +0200 Subject: [PATCH] Make etcd component status consistent with health probes Co-authored-by: Antonio Ojea --- pkg/registry/core/componentstatus/rest.go | 8 +-- .../core/componentstatus/rest_test.go | 6 +- .../core/componentstatus/validator.go | 31 ++++++++- .../core/componentstatus/validator_test.go | 2 +- pkg/registry/core/rest/storage_core.go | 45 ++---------- pkg/registry/core/rest/storage_core_test.go | 4 ++ .../pkg/server/storage/storage_factory.go | 69 ++++++++++++++++--- .../server/storage/storage_factory_test.go | 56 +++++++++++++++ .../pkg/storage/etcd3/healthcheck.go | 1 + .../storage/storagebackend/factory/etcd3.go | 60 +++++++++++++--- .../storage/storagebackend/factory/factory.go | 18 +++++ 11 files changed, 233 insertions(+), 67 deletions(-) diff --git a/pkg/registry/core/componentstatus/rest.go b/pkg/registry/core/componentstatus/rest.go index 22fe41a0c84d..dcfa73b9708a 100644 --- a/pkg/registry/core/componentstatus/rest.go +++ b/pkg/registry/core/componentstatus/rest.go @@ -38,12 +38,12 @@ import ( ) type REST struct { - GetServersToValidate func() map[string]*Server + GetServersToValidate func() map[string]Server rest.TableConvertor } // NewStorage returns a new REST. -func NewStorage(serverRetriever func() map[string]*Server) *REST { +func NewStorage(serverRetriever func() map[string]Server) *REST { return &REST{ GetServersToValidate: serverRetriever, TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, @@ -77,7 +77,7 @@ func (rs *REST) List(ctx context.Context, options *metainternalversion.ListOptio wait.Add(len(servers)) statuses := make(chan api.ComponentStatus, len(servers)) for k, v := range servers { - go func(name string, server *Server) { + go func(name string, server Server) { defer wait.Done() status := rs.getComponentStatus(name, server) statuses <- *status @@ -148,7 +148,7 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus { } } -func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus { +func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus { status, msg, err := server.DoServerCheck() errorMsg := "" if err != nil { diff --git a/pkg/registry/core/componentstatus/rest_test.go b/pkg/registry/core/componentstatus/rest_test.go index 62366ab4fbe6..9593f0db78e6 100644 --- a/pkg/registry/core/componentstatus/rest_test.go +++ b/pkg/registry/core/componentstatus/rest_test.go @@ -60,9 +60,9 @@ func NewTestREST(resp testResponse) *REST { err: resp.err, } return &REST{ - GetServersToValidate: func() map[string]*Server { - return map[string]*Server{ - "test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober}, + GetServersToValidate: func() map[string]Server { + return map[string]Server{ + "test1": &HttpServer{Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober}, } }, } diff --git a/pkg/registry/core/componentstatus/validator.go b/pkg/registry/core/componentstatus/validator.go index f3221745a7c0..20c10df94c81 100644 --- a/pkg/registry/core/componentstatus/validator.go +++ b/pkg/registry/core/componentstatus/validator.go @@ -17,11 +17,14 @@ limitations under the License. package componentstatus import ( + "context" "crypto/tls" "sync" "time" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/kubernetes/pkg/probe" httpprober "k8s.io/kubernetes/pkg/probe/http" ) @@ -32,7 +35,11 @@ const ( type ValidatorFn func([]byte) error -type Server struct { +type Server interface { + DoServerCheck() (probe.Result, string, error) +} + +type HttpServer struct { Addr string Port int Path string @@ -56,7 +63,7 @@ type ServerStatus struct { Err string `json:"err,omitempty"` } -func (server *Server) DoServerCheck() (probe.Result, string, error) { +func (server *HttpServer) DoServerCheck() (probe.Result, string, error) { // setup the prober server.Once.Do(func() { if server.Prober != nil { @@ -87,3 +94,23 @@ func (server *Server) DoServerCheck() (probe.Result, string, error) { } return result, data, nil } + +type EtcdServer struct { + storagebackend.Config +} + +func (server *EtcdServer) DoServerCheck() (probe.Result, string, error) { + prober, err := factory.CreateProber(server.Config) + if err != nil { + return probe.Failure, "", err + } + defer prober.Close() + + ctx, cancel := context.WithTimeout(context.Background(), probeTimeOut) + defer cancel() + err = prober.Probe(ctx) + if err != nil { + return probe.Failure, "", err + } + return probe.Success, "", err +} diff --git a/pkg/registry/core/componentstatus/validator_test.go b/pkg/registry/core/componentstatus/validator_test.go index 9ff62ba7467f..b77fd369b55f 100644 --- a/pkg/registry/core/componentstatus/validator_test.go +++ b/pkg/registry/core/componentstatus/validator_test.go @@ -49,7 +49,7 @@ func TestValidate(t *testing.T) { {probe.Success, "foo", nil, probe.Success, "foo", false, nil}, } - s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"} + s := HttpServer{Addr: "foo.com", Port: 8080, Path: "/healthz"} for _, test := range tests { fakeProber := &fakeHttpProber{ diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 206e87e8909a..e4607501674e 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -21,12 +21,8 @@ import ( "fmt" "net" "net/http" - "net/url" - "strings" "time" - "k8s.io/klog/v2" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -35,7 +31,6 @@ import ( "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" - "k8s.io/apiserver/pkg/storage/etcd3" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -65,7 +60,6 @@ import ( serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/serviceaccount" - utilsnet "k8s.io/utils/net" ) // LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but @@ -387,43 +381,16 @@ type componentStatusStorage struct { storageFactory serverstorage.StorageFactory } -func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server { +func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server { // this is fragile, which assumes that the default port is being used // TODO: switch to secure port until these components remove the ability to serve insecurely. - serversToValidate := map[string]*componentstatus.Server{ - "controller-manager": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"}, - "scheduler": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"}, + serversToValidate := map[string]componentstatus.Server{ + "controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"}, + "scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"}, } - for ix, machine := range s.storageFactory.Backends() { - etcdUrl, err := url.Parse(machine.Server) - if err != nil { - klog.Errorf("Failed to parse etcd url for validation: %v", err) - continue - } - var port int - var addr string - if strings.Contains(etcdUrl.Host, ":") { - var portString string - addr, portString, err = net.SplitHostPort(etcdUrl.Host) - if err != nil { - klog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err) - continue - } - port, _ = utilsnet.ParsePort(portString, true) - } else { - addr = etcdUrl.Host - port = 2379 - } - // TODO: etcd health checking should be abstracted in the storage tier - serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{ - Addr: addr, - EnableHTTPS: etcdUrl.Scheme == "https", - TLSConfig: machine.TLSConfig, - Port: port, - Path: "/health", - Validate: etcd3.EtcdHealthCheck, - } + for ix, cfg := range s.storageFactory.Configs() { + serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg} } return serversToValidate } diff --git a/pkg/registry/core/rest/storage_core_test.go b/pkg/registry/core/rest/storage_core_test.go index 7e674f17aa3c..411efa211dce 100644 --- a/pkg/registry/core/rest/storage_core_test.go +++ b/pkg/registry/core/rest/storage_core_test.go @@ -51,3 +51,7 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s func (f fakeStorageFactory) Backends() []storage.Backend { return []storage.Backend{{Server: "etcd-0"}} } + +func (f fakeStorageFactory) Configs() []storagebackend.Config { + return []storagebackend.Config{{Transport: storagebackend.TransportConfig{ServerList: []string{"etcd-0"}}}} +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index d8de4cd84fb4..3aebab7de8cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -22,8 +22,6 @@ import ( "io/ioutil" "strings" - "k8s.io/klog/v2" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" @@ -31,6 +29,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" ) // Backend describes the storage servers, the information here should be enough @@ -53,8 +52,12 @@ type StorageFactory interface { // centralized control over the shape of etcd directories ResourcePrefix(groupResource schema.GroupResource) string + // Configs gets configurations for all of registered storage destinations. + Configs() []storagebackend.Config + // Backends gets all backends for all registered storage destinations. // Used for getting all instances for health validations. + // Deprecated: Use Configs instead Backends() []Backend } @@ -288,28 +291,76 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (* return storageConfig.ForResource(groupResource), nil } +// Configs implements StorageFactory. +func (s *DefaultStorageFactory) Configs() []storagebackend.Config { + return configs(s.StorageConfig, s.Overrides) +} + +// Configs gets configurations for all of registered storage destinations. +func Configs(storageConfig storagebackend.Config) []storagebackend.Config { + return configs(storageConfig, nil) +} + +// Returns all storage configurations including those for group resource overrides +func configs(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []storagebackend.Config { + locations := sets.NewString() + configs := []storagebackend.Config{} + for _, loc := range storageConfig.Transport.ServerList { + // copy + newConfig := storageConfig + newConfig.Transport.ServerList = []string{loc} + configs = append(configs, newConfig) + locations.Insert(loc) + } + + for _, override := range grOverrides { + for _, loc := range override.etcdLocation { + if locations.Has(loc) { + continue + } + // copy + newConfig := storageConfig + override.Apply(&newConfig, &StorageCodecConfig{}) + newConfig.Transport.ServerList = []string{loc} + configs = append(configs, newConfig) + locations.Insert(loc) + } + } + return configs +} + +// Backends implements StorageFactory. +func (s *DefaultStorageFactory) Backends() []Backend { + return backends(s.StorageConfig, s.Overrides) +} + // Backends returns all backends for all registered storage destinations. // Used for getting all instances for health validations. -func (s *DefaultStorageFactory) Backends() []Backend { - servers := sets.NewString(s.StorageConfig.Transport.ServerList...) +// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber. +func Backends(storageConfig storagebackend.Config) []Backend { + return backends(storageConfig, nil) +} + +func backends(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []Backend { + servers := sets.NewString(storageConfig.Transport.ServerList...) - for _, overrides := range s.Overrides { + for _, overrides := range grOverrides { servers.Insert(overrides.etcdLocation...) } tlsConfig := &tls.Config{ InsecureSkipVerify: true, } - if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 { - cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile) + if len(storageConfig.Transport.CertFile) > 0 && len(storageConfig.Transport.KeyFile) > 0 { + cert, err := tls.LoadX509KeyPair(storageConfig.Transport.CertFile, storageConfig.Transport.KeyFile) if err != nil { klog.Errorf("failed to load key pair while getting backends: %s", err) } else { tlsConfig.Certificates = []tls.Certificate{cert} } } - if len(s.StorageConfig.Transport.TrustedCAFile) > 0 { - if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil { + if len(storageConfig.Transport.TrustedCAFile) > 0 { + if caCert, err := ioutil.ReadFile(storageConfig.Transport.TrustedCAFile); err != nil { klog.Errorf("failed to read ca file while getting backends: %s", err) } else { caPool := x509.NewCertPool() diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go index e0f07f24f6ce..84f01e09aa23 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory_test.go @@ -185,3 +185,59 @@ func TestUpdateEtcdOverrides(t *testing.T) { } } + +func TestConfigs(t *testing.T) { + exampleinstall.Install(scheme) + defaultEtcdLocations := []string{"http://127.0.0.1", "http://127.0.0.2"} + + testCases := []struct { + resource schema.GroupResource + servers []string + wantConfigs []storagebackend.Config + }{ + { + wantConfigs: []storagebackend.Config{ + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true}, + }, + }, + { + resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000"}, + wantConfigs: []storagebackend.Config{ + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true}, + }, + }, + { + resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000", "https://127.0.0.1", "http://127.0.0.2"}, + wantConfigs: []storagebackend.Config{ + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true}, + {Transport: storagebackend.TransportConfig{ServerList: []string{"https://127.0.0.1"}}, Prefix: "/registry", Paging: true}, + }, + }, + } + + for i, test := range testCases { + defaultConfig := storagebackend.Config{ + Prefix: "/registry", + Transport: storagebackend.TransportConfig{ + ServerList: defaultEtcdLocations, + }, + } + storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil) + if len(test.servers) > 0 { + storageFactory.SetEtcdLocation(test.resource, test.servers) + } + + got := storageFactory.Configs() + if !reflect.DeepEqual(test.wantConfigs, got) { + t.Errorf("%d: expected %v, got %v", i, test.wantConfigs, got) + continue + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go index ad051d2d6cdf..3d4898103789 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go @@ -28,6 +28,7 @@ type etcdHealth struct { } // EtcdHealthCheck decodes data returned from etcd /healthz handler. +// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber. func EtcdHealthCheck(data []byte) error { obj := etcdHealth{} if err := json.Unmarshal(data, &obj); err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 55e2cc0b42c9..2f9908bd3576 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -127,13 +127,12 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // constructing the etcd v3 client blocks and times out if etcd is not available. // retry in a loop in the background until we successfully create the client, storing the client or error encountered - lock := sync.Mutex{} - var client *clientv3.Client + lock := sync.RWMutex{} + var prober *etcd3Prober clientErr := fmt.Errorf("etcd client connection not yet established") go wait.PollUntil(time.Second, func() (bool, error) { - newClient, err := newETCD3Client(c.Transport) - + newProber, err := newETCD3Prober(c) lock.Lock() defer lock.Unlock() @@ -141,7 +140,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan select { case <-stopCh: if err == nil { - newClient.Close() + newProber.Close() } return true, nil default: @@ -151,7 +150,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan clientErr = err return false, nil } - client = newClient + prober = newProber clientErr = nil return true, nil }, stopCh) @@ -163,8 +162,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan lock.Lock() defer lock.Unlock() - if client != nil { - client.Close() + if prober != nil { + prober.Close() clientErr = fmt.Errorf("server is shutting down") } }() @@ -183,7 +182,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 - _, err := client.Get(ctx, path.Join("/", c.Prefix, "health")) + err := prober.Probe(ctx) if err == nil { return nil } @@ -191,6 +190,49 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan }, nil } +func newETCD3Prober(c storagebackend.Config) (*etcd3Prober, error) { + client, err := newETCD3Client(c.Transport) + if err != nil { + return nil, err + } + return &etcd3Prober{ + client: client, + prefix: c.Prefix, + }, nil +} + +type etcd3Prober struct { + prefix string + + mux sync.RWMutex + client *clientv3.Client + closed bool +} + +func (p *etcd3Prober) Close() error { + p.mux.Lock() + defer p.mux.Unlock() + if !p.closed { + p.closed = true + return p.client.Close() + } + return fmt.Errorf("prober was closed") +} + +func (p *etcd3Prober) Probe(ctx context.Context) error { + p.mux.RLock() + defer p.mux.RUnlock() + if p.closed { + return fmt.Errorf("prober was closed") + } + // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 + _, err := p.client.Get(ctx, path.Join("/", p.prefix, "health")) + if err != nil { + return fmt.Errorf("error getting data from etcd: %w", err) + } + return nil +} + var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index 4c8a409d659c..c8cdd19b97a6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -17,6 +17,7 @@ limitations under the License. package factory import ( + "context" "fmt" "k8s.io/apimachinery/pkg/runtime" @@ -61,3 +62,20 @@ func CreateReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() e return nil, fmt.Errorf("unknown storage type: %s", c.Type) } } + +func CreateProber(c storagebackend.Config) (Prober, error) { + switch c.Type { + case storagebackend.StorageTypeETCD2: + return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) + case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: + return newETCD3Prober(c) + default: + return nil, fmt.Errorf("unknown storage type: %s", c.Type) + } +} + +// Prober is an interface that defines the Probe function for doing etcd readiness/liveness checks. +type Prober interface { + Probe(ctx context.Context) error + Close() error +}