Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove static kubelet client, refactor ConnectionInfoGetter #34474

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 1 addition & 7 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
genericvalidation "k8s.io/kubernetes/pkg/genericapiserver/validation"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/serviceaccount"
Expand Down Expand Up @@ -137,11 +136,6 @@ func Run(s *options.APIServer) error {
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}

kubeletClient, err := kubeletclient.NewStaticKubeletClient(&s.KubeletConfig)
if err != nil {
glog.Fatalf("Failed to start kubelet client: %v", err)
}

if s.StorageConfig.DeserializationCacheSize == 0 {
// When size of cache is not explicitly set, estimate its size based on
// target memory usage.
Expand Down Expand Up @@ -316,7 +310,7 @@ func Run(s *options.APIServer) error {
EnableCoreControllers: true,
DeleteCollectionWorkers: s.DeleteCollectionWorkers,
EventTTL: s.EventTTL,
KubeletClient: kubeletClient,
KubeletClientConfig: s.KubeletConfig,
EnableUISupport: true,
EnableLogsSupport: true,

Expand Down
131 changes: 80 additions & 51 deletions pkg/kubelet/client/kubelet_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@ limitations under the License.
package client

import (
"errors"
"fmt"
"net"
"net/http"
"strings"
"strconv"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)

type KubeletClientConfig struct {
Expand All @@ -50,19 +48,17 @@ type KubeletClientConfig struct {
Dial func(net, addr string) (net.Conn, error)
}

// KubeletClient is an interface for all kubelet functionality
type KubeletClient interface {
GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, port uint, transport http.RoundTripper, err error)
// ConnectionInfo provides the information needed to connect to a kubelet
type ConnectionInfo struct {
Scheme string
Hostname string
Port string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if things are easier if Port is an integer type

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used for a comparison to a string in one place, and JoinHostPort (which expects string) in four places, so I'd prefer to leave it as a string

Transport http.RoundTripper
}

// ConnectionInfoGetter provides ConnectionInfo for the kubelet running on a named node
type ConnectionInfoGetter interface {
GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, host string, port uint, transport http.RoundTripper, err error)
}

// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct {
Client *http.Client
Config *KubeletClientConfig
GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (*ConnectionInfo, error)
}

func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
Expand All @@ -82,43 +78,6 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
return transport.HTTPWrappersForConfig(config.transportConfig(), rt)
}

// TODO: this structure is questionable, it should be using client.Config and overriding defaults.
func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error) {
transport, err := MakeTransport(config)
if err != nil {
return nil, err
}
c := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout,
}
return &HTTPKubeletClient{
Client: c,
Config: config,
}, nil
}

// In default HTTPKubeletClient ctx is unused.
func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
if errs := validation.ValidateNodeName(string(nodeName), false); len(errs) != 0 {
return "", 0, nil, fmt.Errorf("invalid node name: %s", strings.Join(errs, ";"))
}
scheme := "http"
if c.Config.EnableHttps {
scheme = "https"
}
return scheme, c.Config.Port, c.Client.Transport, nil
}

// FakeKubeletClient is a fake implementation of KubeletClient which returns an error
// when called. It is useful to pass to the master in a test configuration with
// no kubelets.
type FakeKubeletClient struct{}

func (c FakeKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}

// transportConfig converts a client config to an appropriate transport config.
func (c *KubeletClientConfig) transportConfig() *transport.Config {
cfg := &transport.Config{
Expand All @@ -137,3 +96,73 @@ func (c *KubeletClientConfig) transportConfig() *transport.Config {
}
return cfg
}

// NodeGetter defines an interface for looking up a node by name
type NodeGetter interface {
Get(name string) (*api.Node, error)
}

// NodeGetterFunc allows implementing NodeGetter with a function
type NodeGetterFunc func(name string) (*api.Node, error)

func (f NodeGetterFunc) Get(name string) (*api.Node, error) {
return f(name)
}

// NodeConnectionInfoGetter obtains connection info from the status of a Node API object
type NodeConnectionInfoGetter struct {
// nodes is used to look up Node objects
nodes NodeGetter
// scheme is the scheme to use to connect to all kubelets
scheme string
// defaultPort is the port to use if no Kubelet endpoint port is recorded in the node status
defaultPort int
// transport is the transport to use to send a request to all kubelets
transport http.RoundTripper
}

func NewNodeConnectionInfoGetter(nodes NodeGetter, config KubeletClientConfig) (ConnectionInfoGetter, error) {
scheme := "http"
if config.EnableHttps {
scheme = "https"
}

transport, err := MakeTransport(&config)
if err != nil {
return nil, err
}

return &NodeConnectionInfoGetter{
nodes: nodes,
scheme: scheme,
defaultPort: int(config.Port),
transport: transport,
}, nil
}

func (k *NodeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (*ConnectionInfo, error) {
node, err := k.nodes.Get(string(nodeName))
if err != nil {
return nil, err
}

// Find a kubelet-reported address, using preferred address type
hostIP, err := nodeutil.GetNodeHostIP(node)
if err != nil {
return nil, err
}
host := hostIP.String()

// Use the kubelet-reported port, if present
port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if port <= 0 {
port = k.defaultPort
}

return &ConnectionInfo{
Scheme: k.scheme,
Hostname: host,
Port: strconv.Itoa(port),
Transport: k.transport,
}, nil
}
80 changes: 12 additions & 68 deletions pkg/kubelet/client/kubelet_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,17 @@ limitations under the License.
package client

import (
"encoding/json"
"net/http/httptest"
"net/url"
"testing"

"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/probe"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)

func TestHTTPKubeletClient(t *testing.T) {
expectObj := probe.Success
body, err := json.Marshal(expectObj)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()

if _, err := url.Parse(testServer.URL); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
// Ensure a node client can be used as a NodeGetter.
// This allows anyone with a node client to easily construct a NewNodeConnectionInfoGetter.
var _ = NodeGetter(unversioned.NodeInterface(nil))

func TestNewKubeletClient(t *testing.T) {
config := &KubeletClientConfig{
EnableHttps: false,
}

client, err := NewStaticKubeletClient(config)
if err != nil {
t.Errorf("Error while trying to create a client: %v", err)
}
if client == nil {
t.Error("client is nil.")
}
}

func TestNewKubeletClientTLSInvalid(t *testing.T) {
func TestMakeTransportInvalid(t *testing.T) {
config := &KubeletClientConfig{
EnableHttps: true,
//Invalid certificate and key path
Expand All @@ -71,16 +38,16 @@ func TestNewKubeletClientTLSInvalid(t *testing.T) {
},
}

client, err := NewStaticKubeletClient(config)
rt, err := MakeTransport(config)
if err == nil {
t.Errorf("Expected an error")
}
if client != nil {
t.Error("client should be nil as we provided invalid cert file")
if rt != nil {
t.Error("rt should be nil as we provided invalid cert file")
}
}

func TestNewKubeletClientTLSValid(t *testing.T) {
func TestMakeTransportValid(t *testing.T) {
config := &KubeletClientConfig{
Port: 1234,
EnableHttps: true,
Expand All @@ -93,34 +60,11 @@ func TestNewKubeletClientTLSValid(t *testing.T) {
},
}

client, err := NewStaticKubeletClient(config)
rt, err := MakeTransport(config)
if err != nil {
t.Errorf("Not expecting an error #%v", err)
}
if client == nil {
t.Error("client should not be nil")
}

{
scheme, port, transport, err := client.GetRawConnectionInfo(nil, "foo")
if err != nil {
t.Errorf("Error getting info: %v", err)
}
if scheme != "https" {
t.Errorf("Expected https, got %s", scheme)
}
if port != 1234 {
t.Errorf("Expected 1234, got %d", port)
}
if transport == nil {
t.Errorf("Expected transport, got nil")
}
}

{
_, _, _, err := client.GetRawConnectionInfo(nil, "foo bar")
if err == nil {
t.Errorf("Expected error getting connection info for invalid node name, got none")
}
if rt == nil {
t.Error("rt should not be nil")
}
}
11 changes: 6 additions & 5 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -100,7 +101,7 @@ type Config struct {
EndpointReconcilerConfig EndpointReconcilerConfig
DeleteCollectionWorkers int
EventTTL time.Duration
KubeletClient kubeletclient.KubeletClient
KubeletClientConfig kubeletclient.KubeletClientConfig
// genericapiserver.RESTStorageProviders provides RESTStorage building methods keyed by groupName
RESTStorageProviders map[string]genericapiserver.RESTStorageProvider
// Used to start and monitor tunneling
Expand Down Expand Up @@ -183,10 +184,10 @@ func (c *Config) SkipComplete() completedConfig {
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClient
// KubeletClientConfig
func (c completedConfig) New() (*Master, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need KubeletClientConfig to be set? The comment on 186 needs updating either way though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored check, updated comment

if c.KubeletClient == nil {
return nil, fmt.Errorf("Master.New() called with config.KubeletClient == nil")
if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}

s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
Expand Down Expand Up @@ -226,7 +227,7 @@ func (c completedConfig) New() (*Master, error) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.StorageFactory,
ProxyTransport: s.ProxyTransport,
KubeletClient: c.KubeletClient,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
Expand Down
4 changes: 2 additions & 2 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
openapigen "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -87,7 +87,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
config.GenericConfig.APIResourceConfigSource = DefaultAPIResourceConfigSource()
config.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
config.KubeletClient = client.FakeKubeletClient{}
config.GenericConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
config.GenericConfig.APIGroupPrefix = "/apis"
config.GenericConfig.APIResourceConfigSource = DefaultAPIResourceConfigSource()
Expand All @@ -97,6 +96,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.EnableVersion = true
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
config.EnableCoreControllers = false
config.KubeletClientConfig = kubeletclient.KubeletClientConfig{Port: 10250}

// TODO: this is kind of hacky. The trouble is that the sync loop
// runs in a go-routine and there is no way to validate in the test
Expand Down