Skip to content

Commit

Permalink
Merge pull request kubernetes#118460 from serathius/componentstatuses
Browse files Browse the repository at this point in the history
Make etcd component status consistent with health probes
  • Loading branch information
k8s-ci-robot committed Jun 13, 2023
2 parents 96b08af + a60314c commit 3b44969
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 64 deletions.
8 changes: 4 additions & 4 deletions pkg/registry/core/componentstatus/rest.go
Expand Up @@ -38,12 +38,12 @@ import (
)

type REST struct {
GetServersToValidate func() map[string]*Server
GetServersToValidate func() map[string]Server
rest.TableConvertor
}

// NewStorage returns a new REST.
func NewStorage(serverRetriever func() map[string]*Server) *REST {
func NewStorage(serverRetriever func() map[string]Server) *REST {
return &REST{
GetServersToValidate: serverRetriever,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
Expand Down Expand Up @@ -83,7 +83,7 @@ func (rs *REST) List(ctx context.Context, options *metainternalversion.ListOptio
wait.Add(len(servers))
statuses := make(chan api.ComponentStatus, len(servers))
for k, v := range servers {
go func(name string, server *Server) {
go func(name string, server Server) {
defer wait.Done()
status := rs.getComponentStatus(name, server)
statuses <- *status
Expand Down Expand Up @@ -153,7 +153,7 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus {
}
}

func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus {
func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus {
status, msg, err := server.DoServerCheck()
errorMsg := ""
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/registry/core/componentstatus/rest_test.go
Expand Up @@ -59,9 +59,9 @@ func NewTestREST(resp testResponse) *REST {
err: resp.err,
}
return &REST{
GetServersToValidate: func() map[string]*Server {
return map[string]*Server{
"test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
GetServersToValidate: func() map[string]Server {
return map[string]Server{
"test1": &HttpServer{Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
}
},
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/registry/core/componentstatus/validator.go
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package componentstatus

import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/kubernetes/pkg/probe"
httpprober "k8s.io/kubernetes/pkg/probe/http"
)
Expand All @@ -33,7 +36,11 @@ const (

type ValidatorFn func([]byte) error

type Server struct {
type Server interface {
DoServerCheck() (probe.Result, string, error)
}

type HttpServer struct {
Addr string
Port int
Path string
Expand All @@ -57,7 +64,7 @@ type ServerStatus struct {
Err string `json:"err,omitempty"`
}

func (server *Server) DoServerCheck() (probe.Result, string, error) {
func (server *HttpServer) DoServerCheck() (probe.Result, string, error) {
// setup the prober
server.Once.Do(func() {
if server.Prober != nil {
Expand Down Expand Up @@ -92,3 +99,23 @@ func (server *Server) DoServerCheck() (probe.Result, string, error) {
}
return result, data, nil
}

type EtcdServer struct {
storagebackend.Config
}

func (server *EtcdServer) DoServerCheck() (probe.Result, string, error) {
prober, err := factory.CreateProber(server.Config)
if err != nil {
return probe.Failure, "", err
}
defer prober.Close()

ctx, cancel := context.WithTimeout(context.Background(), probeTimeOut)
defer cancel()
err = prober.Probe(ctx)
if err != nil {
return probe.Failure, "", err
}
return probe.Success, "", err
}
2 changes: 1 addition & 1 deletion pkg/registry/core/componentstatus/validator_test.go
Expand Up @@ -49,7 +49,7 @@ func TestValidate(t *testing.T) {
{probe.Success, "foo", nil, probe.Success, "foo", false, nil},
}

s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}
s := HttpServer{Addr: "foo.com", Port: 8080, Path: "/healthz"}

for _, test := range tests {
fakeProber := &fakeHttpProber{
Expand Down
45 changes: 6 additions & 39 deletions pkg/registry/core/rest/storage_core.go
Expand Up @@ -21,12 +21,8 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"

"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
Expand All @@ -35,7 +31,6 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
Expand Down Expand Up @@ -69,7 +64,6 @@ import (
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/serviceaccount"
utilsnet "k8s.io/utils/net"
)

// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
Expand Down Expand Up @@ -416,43 +410,16 @@ type componentStatusStorage struct {
storageFactory serverstorage.StorageFactory
}

func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server {
func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server {
// this is fragile, which assumes that the default port is being used
// TODO: switch to secure port until these components remove the ability to serve insecurely.
serversToValidate := map[string]*componentstatus.Server{
"controller-manager": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
"scheduler": {EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
serversToValidate := map[string]componentstatus.Server{
"controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
"scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
}

for ix, machine := range s.storageFactory.Backends() {
etcdUrl, err := url.Parse(machine.Server)
if err != nil {
klog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
klog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = utilsnet.ParsePort(portString, true)
} else {
addr = etcdUrl.Host
port = 2379
}
// TODO: etcd health checking should be abstracted in the storage tier
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{
Addr: addr,
EnableHTTPS: etcdUrl.Scheme == "https",
TLSConfig: machine.TLSConfig,
Port: port,
Path: "/health",
Validate: etcd3.EtcdHealthCheck,
}
for ix, cfg := range s.storageFactory.Configs() {
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg}
}
return serversToValidate
}
4 changes: 4 additions & 0 deletions pkg/registry/core/rest/storage_core_test.go
Expand Up @@ -51,3 +51,7 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s
func (f fakeStorageFactory) Backends() []storage.Backend {
return []storage.Backend{{Server: "etcd-0"}}
}

func (f fakeStorageFactory) Configs() []storagebackend.Config {
return []storagebackend.Config{{Transport: storagebackend.TransportConfig{ServerList: []string{"etcd-0"}}}}
}
8 changes: 8 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
Expand Up @@ -433,6 +433,10 @@ func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) str
return resource.Group + "/" + resource.Resource
}

func (s *SimpleStorageFactory) Configs() []storagebackend.Config {
return serverstorage.Configs(s.StorageConfig)
}

func (s *SimpleStorageFactory) Backends() []serverstorage.Backend {
// nothing should ever call this method but we still provide a functional implementation
return serverstorage.Backends(s.StorageConfig)
Expand Down Expand Up @@ -463,6 +467,10 @@ func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource
return t.delegate.ResourcePrefix(resource)
}

func (t *transformerStorageFactory) Configs() []storagebackend.Config {
return t.delegate.Configs()
}

func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
return t.delegate.Backends()
}
49 changes: 45 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go
Expand Up @@ -22,14 +22,13 @@ import (
"io/ioutil"
"strings"

"k8s.io/klog/v2"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)

// Backend describes the storage servers, the information here should be enough
Expand All @@ -52,8 +51,12 @@ type StorageFactory interface {
// centralized control over the shape of etcd directories
ResourcePrefix(groupResource schema.GroupResource) string

// Configs gets configurations for all of registered storage destinations.
Configs() []storagebackend.Config

// Backends gets all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Deprecated: Use Configs instead
Backends() []Backend
}

Expand Down Expand Up @@ -276,14 +279,52 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
return storageConfig.ForResource(groupResource), nil
}

// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Configs implements StorageFactory.
func (s *DefaultStorageFactory) Configs() []storagebackend.Config {
return configs(s.StorageConfig, s.Overrides)
}

// Configs gets configurations for all of registered storage destinations.
func Configs(storageConfig storagebackend.Config) []storagebackend.Config {
return configs(storageConfig, nil)
}

// Returns all storage configurations including those for group resource overrides
func configs(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []storagebackend.Config {
locations := sets.NewString()
configs := []storagebackend.Config{}
for _, loc := range storageConfig.Transport.ServerList {
// copy
newConfig := storageConfig
newConfig.Transport.ServerList = []string{loc}
configs = append(configs, newConfig)
locations.Insert(loc)
}

for _, override := range grOverrides {
for _, loc := range override.etcdLocation {
if locations.Has(loc) {
continue
}
// copy
newConfig := storageConfig
override.Apply(&newConfig, &StorageCodecConfig{})
newConfig.Transport.ServerList = []string{loc}
configs = append(configs, newConfig)
locations.Insert(loc)
}
}
return configs
}

// Backends implements StorageFactory.
func (s *DefaultStorageFactory) Backends() []Backend {
return backends(s.StorageConfig, s.Overrides)
}

// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
func Backends(storageConfig storagebackend.Config) []Backend {
return backends(storageConfig, nil)
}
Expand Down
Expand Up @@ -185,3 +185,59 @@ func TestUpdateEtcdOverrides(t *testing.T) {

}
}

func TestConfigs(t *testing.T) {
exampleinstall.Install(scheme)
defaultEtcdLocations := []string{"http://127.0.0.1", "http://127.0.0.2"}

testCases := []struct {
resource schema.GroupResource
servers []string
wantConfigs []storagebackend.Config
}{
{
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
},
},
{
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000"},
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
},
},
{
resource: schema.GroupResource{Group: example.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000", "https://127.0.0.1", "http://127.0.0.2"},
wantConfigs: []storagebackend.Config{
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.2"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"http://127.0.0.1:10000"}}, Prefix: "/registry", Paging: true},
{Transport: storagebackend.TransportConfig{ServerList: []string{"https://127.0.0.1"}}, Prefix: "/registry", Paging: true},
},
},
}

for i, test := range testCases {
defaultConfig := storagebackend.Config{
Prefix: "/registry",
Transport: storagebackend.TransportConfig{
ServerList: defaultEtcdLocations,
},
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", codecs, NewDefaultResourceEncodingConfig(scheme), NewResourceConfig(), nil)
if len(test.servers) > 0 {
storageFactory.SetEtcdLocation(test.resource, test.servers)
}

got := storageFactory.Configs()
if !reflect.DeepEqual(test.wantConfigs, got) {
t.Errorf("%d: expected %v, got %v", i, test.wantConfigs, got)
continue
}
}
}
Expand Up @@ -28,6 +28,7 @@ type etcdHealth struct {
}

// EtcdHealthCheck decodes data returned from etcd /healthz handler.
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
Expand Down

0 comments on commit 3b44969

Please sign in to comment.