Skip to content

Commit

Permalink
Switch windows runtime endpoints to npipe
Browse files Browse the repository at this point in the history
  • Loading branch information
feiskyer committed Oct 22, 2018
1 parent cf3a930 commit 053b71d
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 37 deletions.
6 changes: 3 additions & 3 deletions cmd/kubelet/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func NewKubeletFlags() *KubeletFlags {
if runtime.GOOS == "linux" {
remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
} else if runtime.GOOS == "windows" {
remoteRuntimeEndpoint = "tcp://localhost:3735"
remoteRuntimeEndpoint = "npipe:////./pipe/dockershim"
}

return &KubeletFlags{
Expand Down Expand Up @@ -376,8 +376,8 @@ func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
fs.StringVar(&f.ExperimentalMounterPath, "experimental-mounter-path", f.ExperimentalMounterPath, "[Experimental] Path of mounter binary. Leave empty to use the default mount.")
fs.StringSliceVar(&f.AllowedUnsafeSysctls, "allowed-unsafe-sysctls", f.AllowedUnsafeSysctls, "Comma-separated whitelist of unsafe sysctls or unsafe sysctl patterns (ending in *). Use these at your own risk. Sysctls feature gate is enabled by default.")
fs.BoolVar(&f.ExperimentalKernelMemcgNotification, "experimental-kernel-memcg-notification", f.ExperimentalKernelMemcgNotification, "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.")
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket is supported on Linux, and tcp is supported on windows. Examples:'unix:///var/run/dockershim.sock', 'tcp://localhost:3735'")
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket is supported on Linux, and tcp is supported on windows. Examples:'unix:///var/run/dockershim.sock', 'tcp://localhost:3735'")
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket and tcp endpoints are supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket and tcp endpoints are supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
fs.BoolVar(&f.ExperimentalCheckNodeCapabilitiesBeforeMount, "experimental-check-node-capabilities-before-mount", f.ExperimentalCheckNodeCapabilitiesBeforeMount, "[Experimental] if set true, the kubelet will check the underlying node for required components (binaries, etc.) before performing the mount")
fs.BoolVar(&f.ExperimentalNodeAllocatableIgnoreEvictionThreshold, "experimental-allocatable-ignore-eviction", f.ExperimentalNodeAllocatableIgnoreEvictionThreshold, "When set to 'true', Hard Eviction Thresholds will be ignored while calculating Node Allocatable. See https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/ for more details. [default=false]")
bindableNodeLabels := flag.ConfigurationMap(f.NodeLabels)
Expand Down
27 changes: 23 additions & 4 deletions pkg/kubelet/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,27 @@ load(

go_test(
name = "go_default_test",
srcs = ["util_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
srcs = [
"util_unix_test.go",
"util_windows_test.go",
],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:darwin": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"//conditions:default": [],
}),
)

go_library(
Expand Down Expand Up @@ -40,6 +56,9 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//vendor/github.com/Microsoft/go-winio:go_default_library",
],
"//conditions:default": [],
}),
)
Expand Down
20 changes: 0 additions & 20 deletions pkg/kubelet/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ limitations under the License.
package util

import (
"fmt"
"net/url"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -28,20 +25,3 @@ import (
func FromApiserverCache(opts *metav1.GetOptions) {
opts.ResourceVersion = "0"
}

func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}

if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "unix" {
return "unix", u.Path, nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
22 changes: 22 additions & 0 deletions pkg/kubelet/util/util_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package util
import (
"fmt"
"net"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -77,3 +78,24 @@ func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string)
}
return
}

func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}

switch u.Scheme {
case "tcp":
return "tcp", u.Host, nil

case "unix":
return "unix", u.Path, nil

case "":
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)

default:
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// +build freebsd linux darwin

/*
Copyright 2017 The Kubernetes Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +41,11 @@ func TestParseEndpoint(t *testing.T) {
expectedProtocol: "tcp",
expectedAddr: "localhost:15880",
},
{
endpoint: "npipe://./pipe/mypipe",
expectedProtocol: "npipe",
expectError: true,
},
{
endpoint: "tcp1://abc",
expectedProtocol: "tcp1",
Expand Down
66 changes: 57 additions & 9 deletions pkg/kubelet/util/util_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,85 @@ package util
import (
"fmt"
"net"
"net/url"
"strings"
"time"

"github.com/Microsoft/go-winio"
)

const (
tcpProtocol = "tcp"
tcpProtocol = "tcp"
npipeProtocol = "npipe"
)

func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return nil, err
}
if protocol != tcpProtocol {
return nil, fmt.Errorf("only support tcp endpoint")
}

return net.Listen(protocol, addr)
switch protocol {
case tcpProtocol:
return net.Listen(tcpProtocol, addr)

case npipeProtocol:
return winio.ListenPipe(addr, nil)

default:
return nil, fmt.Errorf("only support tcp and npipe endpoint")
}
}

func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return "", nil, err
}
if protocol != tcpProtocol {
return "", nil, fmt.Errorf("only support tcp endpoint")

if protocol == tcpProtocol {
return addr, tcpDial, nil
}

if protocol == npipeProtocol {
return addr, npipeDial, nil
}

return addr, dial, nil
return "", nil, fmt.Errorf("only support tcp and npipe endpoint")
}

func dial(addr string, timeout time.Duration) (net.Conn, error) {
func tcpDial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(tcpProtocol, addr, timeout)
}

func npipeDial(addr string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(addr, &timeout)
}

func parseEndpoint(endpoint string) (string, string, error) {
// url.Parse doesn't recognize \, so replace with / first.
endpoint = strings.Replace(endpoint, "\\", "/", -1)
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}

if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "npipe" {
if strings.HasPrefix(u.Path, "//./pipe") {
return "npipe", u.Path, nil
}

// fallback host if not provided.
host := u.Host
if host == "" {
host = "."
}
return "npipe", fmt.Sprintf("//%s%s", host, u.Path), nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
92 changes: 92 additions & 0 deletions pkg/kubelet/util/util_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// +build windows

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParseEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expectError bool
expectedProtocol string
expectedAddr string
}{
{
endpoint: "unix:///tmp/s1.sock",
expectedProtocol: "unix",
expectError: true,
},
{
endpoint: "tcp://localhost:15880",
expectedProtocol: "tcp",
expectedAddr: "localhost:15880",
},
{
endpoint: "npipe://./pipe/mypipe",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe",
},
{
endpoint: "npipe:////./pipe/mypipe2",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe2",
},
{
endpoint: "npipe:/pipe/mypipe3",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe3",
},
{
endpoint: "npipe:\\\\.\\pipe\\mypipe4",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe4",
},
{
endpoint: "npipe:\\pipe\\mypipe5",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe5",
},
{
endpoint: "tcp1://abc",
expectedProtocol: "tcp1",
expectError: true,
},
{
endpoint: "a b c",
expectError: true,
},
}

for _, test := range tests {
protocol, addr, err := parseEndpoint(test.endpoint)
assert.Equal(t, test.expectedProtocol, protocol)
if test.expectError {
assert.NotNil(t, err, "Expect error during parsing %q", test.endpoint)
continue
}
require.Nil(t, err, "Expect no error during parsing %q", test.endpoint)
assert.Equal(t, test.expectedAddr, addr)
}

}

0 comments on commit 053b71d

Please sign in to comment.