Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #118460: Make etcd component status consistent with health probes #119039

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/registry/core/componentstatus/rest.go
Expand Up @@ -38,12 +38,12 @@ import (
)

type REST struct {
GetServersToValidate func() map[string]*Server
GetServersToValidate func() map[string]Server
rest.TableConvertor
}

// NewStorage returns a new REST.
func NewStorage(serverRetriever func() map[string]*Server) *REST {
func NewStorage(serverRetriever func() map[string]Server) *REST {
return &REST{
GetServersToValidate: serverRetriever,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
Expand Down Expand Up @@ -77,7 +77,7 @@ func (rs *REST) List(ctx context.Context, options *metainternalversion.ListOptio
wait.Add(len(servers))
statuses := make(chan api.ComponentStatus, len(servers))
for k, v := range servers {
go func(name string, server *Server) {
go func(name string, server Server) {
defer wait.Done()
status := rs.getComponentStatus(name, server)
statuses <- *status
Expand Down Expand Up @@ -148,7 +148,7 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus {
}
}

func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus {
func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus {
status, msg, err := server.DoServerCheck()
errorMsg := ""
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/registry/core/componentstatus/rest_test.go
Expand Up @@ -60,9 +60,9 @@ func NewTestREST(resp testResponse) *REST {
err: resp.err,
}
return &REST{
GetServersToValidate: func() map[string]*Server {
return map[string]*Server{
"test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
GetServersToValidate: func() map[string]Server {
return map[string]Server{
"test1": &HttpServer{Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
}
},
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/registry/core/componentstatus/validator.go
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package componentstatus

import (
"context"
"crypto/tls"
"sync"
"time"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/kubernetes/pkg/probe"
httpprober "k8s.io/kubernetes/pkg/probe/http"
)
Expand All @@ -32,7 +35,11 @@ const (

type ValidatorFn func([]byte) error

type Server struct {
type Server interface {
DoServerCheck() (probe.Result, string, error)
}

type HttpServer struct {
Addr string
Port int
Path string
Expand All @@ -56,7 +63,7 @@ type ServerStatus struct {
Err string `json:"err,omitempty"`
}

func (server *Server) DoServerCheck() (probe.Result, string, error) {
func (server *HttpServer) DoServerCheck() (probe.Result, string, error) {
// setup the prober
server.Once.Do(func() {
if server.Prober != nil {
Expand Down Expand Up @@ -87,3 +94,23 @@ func (server *Server) DoServerCheck() (probe.Result, string, error) {
}
return result, data, nil
}

type EtcdServer struct {
storagebackend.Config
}

func (server *EtcdServer) DoServerCheck() (probe.Result, string, error) {
prober, err := factory.CreateProber(server.Config)
if err != nil {
return probe.Failure, "", err
}
defer prober.Close()

ctx, cancel := context.WithTimeout(context.Background(), probeTimeOut)
defer cancel()
err = prober.Probe(ctx)
if err != nil {
return probe.Failure, "", err
}
return probe.Success, "", err
}
2 changes: 1 addition & 1 deletion pkg/registry/core/componentstatus/validator_test.go
Expand Up @@ -49,7 +49,7 @@ func TestValidate(t *testing.T) {
{probe.Success, "foo", nil, probe.Success, "foo", false, nil},
}

s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}
s := HttpServer{Addr: "foo.com", Port: 8080, Path: "/healthz"}

for _, test := range tests {
fakeProber := &fakeHttpProber{
Expand Down
45 changes: 6 additions & 39 deletions pkg/registry/core/rest/storage_core.go
Expand Up @@ -21,12 +21,8 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"

"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
Expand All @@ -35,7 +31,6 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
Expand Down Expand Up @@ -65,7 +60,6 @@ import (
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/serviceaccount"
utilsnet "k8s.io/utils/net"
)

// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
Expand Down Expand Up @@ -387,43 +381,16 @@ type componentStatusStorage struct {
storageFactory serverstorage.StorageFactory
}

func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server {
func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server {
// this is fragile, which assumes that the default port is being used
// TODO: switch to secure port until these components remove the ability to serve insecurely.
serversToValidate := map[string]*componentstatus.Server{
"controller-manager": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
"scheduler": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
serversToValidate := map[string]componentstatus.Server{
"controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
"scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
}

for ix, machine := range s.storageFactory.Backends() {
etcdUrl, err := url.Parse(machine.Server)
if err != nil {
klog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
klog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = utilsnet.ParsePort(portString, true)
} else {
addr = etcdUrl.Host
port = 2379
}
// TODO: etcd health checking should be abstracted in the storage tier
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{
Addr: addr,
EnableHTTPS: etcdUrl.Scheme == "https",
TLSConfig: machine.TLSConfig,
Port: port,
Path: "/health",
Validate: etcd3.EtcdHealthCheck,
}
for ix, cfg := range s.storageFactory.Configs() {
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg}
}
return serversToValidate
}
4 changes: 4 additions & 0 deletions pkg/registry/core/rest/storage_core_test.go
Expand Up @@ -51,3 +51,7 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s
func (f fakeStorageFactory) Backends() []storage.Backend {
return []storage.Backend{{Server: "etcd-0"}}
}

func (f fakeStorageFactory) Configs() []storagebackend.Config {
return []storagebackend.Config{{Transport: storagebackend.TransportConfig{ServerList: []string{"etcd-0"}}}}
}
69 changes: 60 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go
Expand Up @@ -22,15 +22,14 @@ import (
"io/ioutil"
"strings"

"k8s.io/klog/v2"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)

// Backend describes the storage servers, the information here should be enough
Expand All @@ -53,8 +52,12 @@ type StorageFactory interface {
// centralized control over the shape of etcd directories
ResourcePrefix(groupResource schema.GroupResource) string

// Configs gets configurations for all of registered storage destinations.
Configs() []storagebackend.Config

// Backends gets all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Deprecated: Use Configs instead
Backends() []Backend
}

Expand Down Expand Up @@ -288,28 +291,76 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
return storageConfig.ForResource(groupResource), nil
}

// Configs implements StorageFactory.
func (s *DefaultStorageFactory) Configs() []storagebackend.Config {
return configs(s.StorageConfig, s.Overrides)
}

// Configs gets configurations for all of registered storage destinations.
func Configs(storageConfig storagebackend.Config) []storagebackend.Config {
return configs(storageConfig, nil)
}

// Returns all storage configurations including those for group resource overrides
func configs(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []storagebackend.Config {
locations := sets.NewString()
configs := []storagebackend.Config{}
for _, loc := range storageConfig.Transport.ServerList {
// copy
newConfig := storageConfig
newConfig.Transport.ServerList = []string{loc}
configs = append(configs, newConfig)
locations.Insert(loc)
}

for _, override := range grOverrides {
for _, loc := range override.etcdLocation {
if locations.Has(loc) {
continue
}
// copy
newConfig := storageConfig
override.Apply(&newConfig, &StorageCodecConfig{})
newConfig.Transport.ServerList = []string{loc}
configs = append(configs, newConfig)
locations.Insert(loc)
}
}
return configs
}

// Backends implements StorageFactory.
func (s *DefaultStorageFactory) Backends() []Backend {
return backends(s.StorageConfig, s.Overrides)
}

// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []Backend {
servers := sets.NewString(s.StorageConfig.Transport.ServerList...)
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
func Backends(storageConfig storagebackend.Config) []Backend {
return backends(storageConfig, nil)
}

func backends(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []Backend {
servers := sets.NewString(storageConfig.Transport.ServerList...)

for _, overrides := range s.Overrides {
for _, overrides := range grOverrides {
servers.Insert(overrides.etcdLocation...)
}

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)
if len(storageConfig.Transport.CertFile) > 0 && len(storageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(storageConfig.Transport.CertFile, storageConfig.Transport.KeyFile)
if err != nil {
klog.Errorf("failed to load key pair while getting backends: %s", err)
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
if len(s.StorageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil {
if len(storageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(storageConfig.Transport.TrustedCAFile); err != nil {
klog.Errorf("failed to read ca file while getting backends: %s", err)
} else {
caPool := x509.NewCertPool()
Expand Down
Expand Up @@ -185,3 +185,59 @@ func TestUpdateEtcdOverrides(t *testing.T) {

}
}

func TestConfigs(t *testing.T) {
exampleinstall.Install(scheme)
defaultEtcdLocations := []string{"http://127.0.0.1", "http://127.0.0.2"}

testCases := []struct {
resource schema.GroupResource
servers []string
wantConfigs []storagebackend.Config
}{
{
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
},
},
{
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000"},
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
},
},
{
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000", "https://127.0.0.1", "http://127.0.0.2"},
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"https://127.0.0.1"}}, Prefix: "/registry", Paging: true},
},
},
}

for i, test := range testCases {
defaultConfig := storagebackend.Config{
Prefix: "/registry",
Transport: storagebackend.TransportConfig{
ServerList: defaultEtcdLocations,
},
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil)
if len(test.servers) > 0 {
storageFactory.SetEtcdLocation(test.resource, test.servers)
}

got := storageFactory.Configs()
if !reflect.DeepEqual(test.wantConfigs, got) {
t.Errorf("%d: expected %v, got %v", i, test.wantConfigs, got)
continue
}
}
}
Expand Up @@ -28,6 +28,7 @@ type etcdHealth struct {
}

// EtcdHealthCheck decodes data returned from etcd /healthz handler.
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
Expand Down