Skip to content

Commit

Permalink
Use actual etcd client for /healthz/etcd checks
Browse files Browse the repository at this point in the history
  • Loading branch information
liggitt committed Jun 13, 2018
1 parent 6decdaa commit b39cd00
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 28 deletions.
4 changes: 0 additions & 4 deletions staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/server/options/BUILD
Expand Up @@ -52,8 +52,8 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/resourceconfig:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
Expand Down
27 changes: 14 additions & 13 deletions staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
Expand Up @@ -32,8 +32,8 @@ import (
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/etcd3/preflight"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
)

type EtcdOptions struct {
Expand Down Expand Up @@ -181,29 +181,30 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
if s == nil {
return nil
}

s.addEtcdHealthEndpoint(c)
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
return nil
}

func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
s.addEtcdHealthEndpoint(c)
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}

func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) {
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
if err != nil {
return err
}
c.HealthzChecks = append(c.HealthzChecks, healthz.NamedCheck("etcd", func(r *http.Request) error {
done, err := preflight.EtcdConnection{ServerList: s.StorageConfig.ServerList}.CheckEtcdServers()
if !done {
return fmt.Errorf("etcd failed")
}
if err != nil {
return err
}
return nil
return healthCheck()
}))
return nil
}

type SimpleRestOptionsFactory struct {
Expand Down
Expand Up @@ -37,6 +37,7 @@ go_library(
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
Expand Down
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package factory

import (
"context"
"fmt"
"net"
"net/http"
"time"
Expand All @@ -30,6 +32,29 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
)

func newETCD2HealthCheck(c storagebackend.Config) (func() error, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
}

client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, err
}

members := etcd2client.NewMembersAPI(client)

return func() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := members.List(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}

func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
Expand Down
Expand Up @@ -18,11 +18,14 @@ package factory

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend"
Expand All @@ -38,15 +41,49 @@ var (
dialTimeout = 10 * time.Second
)

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered

clientValue := &atomic.Value{}

clientErrMsg := &atomic.Value{}
clientErrMsg.Store("etcd client connection not yet established")

go wait.PollUntil(time.Second, func() (bool, error) {
client, err := newETCD3Client(c)
if err != nil {
clientErrMsg.Store(err.Error())
return false, nil
}
clientValue.Store(client)
clientErrMsg.Store("")
return true, nil
}, wait.NeverStop)

return func() error {
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 {
return fmt.Errorf(errMsg)
}
client := clientValue.Load().(*clientv3.Client)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := client.Cluster.MemberList(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}

func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, nil, err
return nil, err
}
// NOTE: Client relies on nil tlsConfig
// for non-secure connections, update the implicit variable
Expand All @@ -61,6 +98,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
TLS: tlsConfig,
}
client, err := clientv3.New(cfg)
return client, err
}

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c)
if err != nil {
return nil, nil, err
}
Expand Down
Expand Up @@ -41,3 +41,15 @@ func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

// CreateHealthCheck creates a healthcheck function based on given config.
func CreateHealthCheck(c storagebackend.Config) (func() error, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2HealthCheck(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3HealthCheck(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
4 changes: 0 additions & 4 deletions staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b39cd00

Please sign in to comment.