Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use nodeutil.GetHostIP consistently when talking to nodes #33718

Merged
merged 2 commits into from Oct 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/kubelet/client/kubelet_client.go
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net"
)

Expand All @@ -51,11 +52,11 @@ type KubeletClientConfig struct {

// KubeletClient is an interface for all kubelet functionality
type KubeletClient interface {
ConnectionInfoGetter
GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, port uint, transport http.RoundTripper, err error)
}

type ConnectionInfoGetter interface {
GetConnectionInfo(ctx api.Context, nodeName string) (scheme string, port uint, transport http.RoundTripper, err error)
GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, host string, port uint, transport http.RoundTripper, err error)
}

// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
Expand Down Expand Up @@ -98,8 +99,8 @@ func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error)
}

// In default HTTPKubeletClient ctx is unused.
func (c *HTTPKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
if errs := validation.ValidateNodeName(nodeName, false); len(errs) != 0 {
func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
if errs := validation.ValidateNodeName(string(nodeName), false); len(errs) != 0 {
return "", 0, nil, fmt.Errorf("invalid node name: %s", strings.Join(errs, ";"))
}
scheme := "http"
Expand All @@ -114,7 +115,7 @@ func (c *HTTPKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string)
// no kubelets.
type FakeKubeletClient struct{}

func (c FakeKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
func (c FakeKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/client/kubelet_client_test.go
Expand Up @@ -102,7 +102,7 @@ func TestNewKubeletClientTLSValid(t *testing.T) {
}

{
scheme, port, transport, err := client.GetConnectionInfo(nil, "foo")
scheme, port, transport, err := client.GetRawConnectionInfo(nil, "foo")
if err != nil {
t.Errorf("Error getting info: %v", err)
}
Expand All @@ -118,7 +118,7 @@ func TestNewKubeletClientTLSValid(t *testing.T) {
}

{
_, _, _, err := client.GetConnectionInfo(nil, "foo bar")
_, _, _, err := client.GetRawConnectionInfo(nil, "foo bar")
if err == nil {
t.Errorf("Expected error getting connection info for invalid node name, got none")
}
Expand Down
37 changes: 20 additions & 17 deletions pkg/registry/core/node/etcd/etcd.go
Expand Up @@ -30,6 +30,8 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)

// NodeStorage includes storage for nodes and all sub resources
Expand All @@ -41,7 +43,7 @@ type NodeStorage struct {

type REST struct {
*registry.Store
connection client.ConnectionInfoGetter
connection client.KubeletClient
proxyTransport http.RoundTripper
}

Expand All @@ -65,7 +67,7 @@ func (r *StatusREST) Update(ctx api.Context, name string, objInfo rest.UpdatedOb
}

// NewStorage returns a NodeStorage object that will work against nodes.
func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) NodeStorage {
func NewStorage(opts generic.RESTOptions, connection client.KubeletClient, proxyTransport http.RoundTripper) NodeStorage {
prefix := "/" + opts.ResourcePrefix

newListFunc := func() runtime.Object { return &api.NodeList{} }
Expand Down Expand Up @@ -126,30 +128,31 @@ func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Roun

var _ = client.ConnectionInfoGetter(&REST{})

func (r *REST) getKubeletPort(ctx api.Context, nodeName string) (int, error) {
func (r *REST) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, string, uint, http.RoundTripper, error) {
scheme, port, transport, err := r.connection.GetRawConnectionInfo(ctx, nodeName)
if err != nil {
return "", "", 0, nil, err
}

// We probably shouldn't care about context when looking for Node object.
obj, err := r.Get(ctx, nodeName)
obj, err := r.Get(ctx, string(nodeName))
if err != nil {
return 0, err
return "", "", 0, nil, err
}
node, ok := obj.(*api.Node)
if !ok {
return 0, fmt.Errorf("Unexpected object type: %#v", node)
return "", "", 0, nil, fmt.Errorf("Unexpected object type: %#v", node)
}
return int(node.Status.DaemonEndpoints.KubeletEndpoint.Port), nil
}

func (c *REST) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
scheme, port, transport, err := c.connection.GetConnectionInfo(ctx, nodeName)
hostIP, err := nodeutil.GetNodeHostIP(node)
if err != nil {
return "", 0, nil, err
}
daemonPort, err := c.getKubeletPort(ctx, nodeName)
if err != nil {
return "", 0, nil, err
return "", "", 0, nil, err
}
host := hostIP.String()

daemonPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if daemonPort > 0 {
return scheme, uint(daemonPort), transport, nil
return scheme, host, uint(daemonPort), transport, nil
}
return scheme, port, transport, nil
return scheme, host, port, transport, nil
}
3 changes: 2 additions & 1 deletion pkg/registry/core/node/etcd/etcd_test.go
Expand Up @@ -28,12 +28,13 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/types"
)

type fakeConnectionInfoGetter struct {
}

func (fakeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
func (fakeConnectionInfoGetter) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
return "http", 12345, nil, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/registry/core/node/strategy.go
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
pkgstorage "k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/validation/field"
Expand Down Expand Up @@ -189,13 +190,13 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
// We check if we want to get a default Kubelet's transport. It happens if either:
// - no port is specified in request (Kubelet's port is default),
// - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint,
// - there's no information in the API about DaemonEnpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
// - there's no information in the API about DaemonEndpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
if kubeletPort == 0 {
kubeletPort = ports.KubeletPort
}
if portReq == "" || strconv.Itoa(int(kubeletPort)) == portReq {
scheme, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, node.Name)
scheme, host, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, types.NodeName(node.Name))
if err != nil {
return nil, nil, err
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/registry/core/pod/strategy.go
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/validation/field"
)
Expand Down Expand Up @@ -299,12 +300,12 @@ func LogLocation(
return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name))
}
}
nodeHost := pod.Spec.NodeName
if len(nodeHost) == 0 {
nodeName := types.NodeName(pod.Spec.NodeName)
if len(nodeName) == 0 {
// If pod has not been assigned a host, return an empty location
return nil, nil, nil
}
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost)
nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -450,12 +451,12 @@ func streamLocation(
return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name))
}
}
nodeHost := pod.Spec.NodeName
if len(nodeHost) == 0 {
nodeName := types.NodeName(pod.Spec.NodeName)
if len(nodeName) == 0 {
// If pod has not been assigned a host, return an empty location
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
}
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost)
nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -484,12 +485,12 @@ func PortForwardLocation(
return nil, nil, err
}

nodeHost := pod.Spec.NodeName
if len(nodeHost) == 0 {
nodeName := types.NodeName(pod.Spec.NodeName)
if len(nodeName) == 0 {
// If pod has not been assigned a host, return an empty location
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
}
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost)
nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
if err != nil {
return nil, nil, err
}
Expand Down