Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep proxy ssh-tunnels open and healthcheck. #19314

Merged
merged 1 commit into from
Feb 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/tls"
"fmt"
"net"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -211,9 +212,18 @@ func Run(s *options.APIServer) error {
installSSH = instances.AddSSHKeyToAllInstances
}
}

if s.KubeletConfig.Port == 0 {
glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.")
}
// Set up the tunneler
tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, installSSH)
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
// kubelet listen-addresses, we need to plumb through options.
healthCheckPath := &url.URL{
Scheme: "https",
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)),
Path: "healthz",
}
tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH)

// Use the tunneler's dialer to connect to the kubelet
s.KubeletConfig.Dial = tunneler.Dial
Expand Down
4 changes: 2 additions & 2 deletions cmd/kubelet/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewKubeletServer() *KubeletServer {
RktStage1Image: "",
RootDirectory: defaultRootDir,
SerializeImagePulls: true,
StreamingConnectionIdleTimeout: unversioned.Duration{5 * time.Minute},
StreamingConnectionIdleTimeout: unversioned.Duration{4 * time.Hour},
SyncFrequency: unversioned.Duration{1 * time.Minute},
SystemContainer: "",
ReconcileCIDR: true,
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.StringVar(&s.ClusterDNS, "cluster-dns", s.ClusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.DurationVar(&s.StreamingConnectionIdleTimeout.Duration, "streaming-connection-idle-timeout", s.StreamingConnectionIdleTimeout.Duration, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'")
fs.DurationVar(&s.StreamingConnectionIdleTimeout.Duration, "streaming-connection-idle-timeout", s.StreamingConnectionIdleTimeout.Duration, "Maximum time a streaming connection can be idle before the connection is automatically closed. 0 indicates no timeout. Example: '5m'")
fs.DurationVar(&s.NodeStatusUpdateFrequency.Duration, "node-status-update-frequency", s.NodeStatusUpdateFrequency.Duration, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s")
bindableNodeLabels := util.ConfigurationMap(s.NodeLabels)
fs.Var(&bindableNodeLabels, "node-labels", "<Warning: Alpha feature> Labels to add when registering the node in the cluster. Labels must are key=value pairs seperated by ','.")
Expand Down
4 changes: 2 additions & 2 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,10 @@ func RunKubelet(kcfg *KubeletConfig) error {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet as runonce")
glog.Info("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, kcfg)
glog.Infof("Started kubelet")
glog.Info("Started kubelet")
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions docs/admin/kubelet.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ kubelet
--root-dir="/var/lib/kubelet": Directory path for managing kubelet files (volume mounts,etc).
--runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server
--serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]
--streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'
--streaming-connection-idle-timeout=4h0m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. 0 indicates no timeout. Example: '5m'
--sync-frequency=1m0s: Max period between synchronizing running containers and config
--system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: "").
--system-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]
Expand All @@ -146,7 +146,7 @@ kubelet
--volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins
```

###### Auto generated by spf13/cobra on 21-Jan-2016
###### Auto generated by spf13/cobra on 29-Jan-2016


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
28 changes: 14 additions & 14 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,6 @@ func New(c *Config) *Master {
func (m *Master) InstallAPIs(c *Config) {
apiGroupsInfo := []genericapiserver.APIGroupInfo{}

// Run the tunnel.
healthzChecks := []healthz.HealthzChecker{}
if m.tunneler != nil {
m.tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
}

// TODO(nikhiljindal): Refactor generic parts of support services (like /versions) to genericapiserver.
apiserver.InstallSupport(m.MuxHelper, m.RootWebService, c.EnableProfiling, healthzChecks...)

// Install v1 unless disabled.
if !m.ApiGroupVersionOverrides["api/v1"].Disable {
// Install v1 API.
Expand All @@ -192,6 +178,20 @@ func (m *Master) InstallAPIs(c *Config) {
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}

// Run the tunneler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the move below the v1 API setup? does something in the tunneling now depend on the API or storage being available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixing an existing bug I noticed.

The tunneler uses the m.getNodeAddresses function, which calls m.nodeRegistry.ListNodes(). m.nodeRegistry is initialized in m.initV1ResourcesStorage(), which isn't called until after the tunneler starts trying to get node addresses. Right now, we panic and recover in a util.Until loop until the storage is initialized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha, ok

healthzChecks := []healthz.HealthzChecker{}
if m.tunneler != nil {
m.tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
}

// TODO(nikhiljindal): Refactor generic parts of support services (like /versions) to genericapiserver.
apiserver.InstallSupport(m.MuxHelper, m.RootWebService, c.EnableProfiling, healthzChecks...)

// Install root web services
m.HandlerContainer.Add(m.RootWebService)

Expand Down
142 changes: 34 additions & 108 deletions pkg/master/tunneler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package master

import (
"io/ioutil"
"math/rand"
"net"
"net/url"
"os"
"sync"
"sync/atomic"
"time"

Expand All @@ -43,12 +42,12 @@ type Tunneler interface {
}

type SSHTunneler struct {
SSHUser string
SSHKeyfile string
InstallSSHKey InstallSSHKey
SSHUser string
SSHKeyfile string
InstallSSHKey InstallSSHKey
HealthCheckURL *url.URL

tunnels *util.SSHTunnelList
tunnelsLock sync.Mutex
lastSync int64 // Seconds since Epoch
lastSyncMetric prometheus.GaugeFunc
clock util.Clock
Expand All @@ -57,13 +56,13 @@ type SSHTunneler struct {
stopChan chan struct{}
}

func NewSSHTunneler(sshUser string, sshKeyfile string, installSSHKey InstallSSHKey) Tunneler {
func NewSSHTunneler(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler {
return &SSHTunneler{
SSHUser: sshUser,
SSHKeyfile: sshKeyfile,
InstallSSHKey: installSSHKey,

clock: util.RealClock{},
SSHUser: sshUser,
SSHKeyfile: sshKeyfile,
InstallSSHKey: installSSHKey,
HealthCheckURL: healthCheckURL,
clock: util.RealClock{},
}
}

Expand Down Expand Up @@ -93,14 +92,17 @@ func (c *SSHTunneler) Run(getAddresses AddressFunc) {
glog.Errorf("Error detecting if key exists: %v", err)
} else if !exists {
glog.Infof("Key doesn't exist, attempting to create")
err := c.generateSSHKey(c.SSHUser, c.SSHKeyfile, publicKeyFile)
if err != nil {
if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil {
glog.Errorf("Failed to create key pair: %v", err)
}
}
c.tunnels = &util.SSHTunnelList{}
c.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile)

c.tunnels = util.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan)
// Sync loop to ensure that the SSH key has been installed.
c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile)
// Sync tunnelList w/ nodes.
c.lastSync = c.clock.Now().Unix()
c.nodesSyncLoop()
}

// Stop gracefully shuts down the tunneler
Expand All @@ -112,23 +114,7 @@ func (c *SSHTunneler) Stop() {
}

func (c *SSHTunneler) Dial(net, addr string) (net.Conn, error) {
// Only lock while picking a tunnel.
tunnel, err := func() (util.SSHTunnelEntry, error) {
c.tunnelsLock.Lock()
defer c.tunnelsLock.Unlock()
return c.tunnels.PickRandomTunnel()
}()
if err != nil {
return nil, err
}

start := time.Now()
id := rand.Int63() // So you can match begins/ends in the log.
glog.V(3).Infof("[%x: %v] Dialing...", id, tunnel.Address)
defer func() {
glog.V(3).Infof("[%x: %v] Dialed in %v.", id, tunnel.Address, time.Now().Sub(start))
}()
return tunnel.Tunnel.Dial(net, addr)
return c.tunnels.Dial(net, addr)
}

func (c *SSHTunneler) SecondsSinceSync() int64 {
Expand All @@ -137,61 +123,7 @@ func (c *SSHTunneler) SecondsSinceSync() int64 {
return now - then
}

func (c *SSHTunneler) needToReplaceTunnels(addrs []string) bool {
c.tunnelsLock.Lock()
defer c.tunnelsLock.Unlock()
if c.tunnels == nil || c.tunnels.Len() != len(addrs) {
return true
}
// TODO (cjcullen): This doesn't need to be n^2
for ix := range addrs {
if !c.tunnels.Has(addrs[ix]) {
return true
}
}
return false
}

func (c *SSHTunneler) replaceTunnels(user, keyfile string, newAddrs []string) error {
glog.Infof("replacing tunnels. New addrs: %v", newAddrs)
tunnels := util.MakeSSHTunnels(user, keyfile, newAddrs)
if err := tunnels.Open(); err != nil {
return err
}
c.tunnelsLock.Lock()
defer c.tunnelsLock.Unlock()
if c.tunnels != nil {
c.tunnels.Close()
}
c.tunnels = tunnels
atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
return nil
}

func (c *SSHTunneler) loadTunnels(user, keyfile string) error {
addrs, err := c.getAddresses()
if err != nil {
return err
}
if !c.needToReplaceTunnels(addrs) {
return nil
}
// TODO: This is going to unnecessarily close connections to unchanged nodes.
// See comment about using Watch above.
glog.Info("found different nodes. Need to replace tunnels")
return c.replaceTunnels(user, keyfile, addrs)
}

func (c *SSHTunneler) refreshTunnels(user, keyfile string) error {
addrs, err := c.getAddresses()
if err != nil {
return err
}
return c.replaceTunnels(user, keyfile, addrs)
}

func (c *SSHTunneler) setupSecureProxy(user, privateKeyfile, publicKeyfile string) {
// Sync loop to ensure that the SSH key has been installed.
func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
go util.Until(func() {
if c.InstallSSHKey == nil {
glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil")
Expand All @@ -211,30 +143,24 @@ func (c *SSHTunneler) setupSecureProxy(user, privateKeyfile, publicKeyfile strin
glog.Errorf("Failed to install ssh key: %v", err)
}
}, 5*time.Minute, c.stopChan)
// Sync loop for tunnels
// TODO: switch this to watch.
go util.Until(func() {
if err := c.loadTunnels(user, privateKeyfile); err != nil {
glog.Errorf("Failed to load SSH Tunnels: %v", err)
}
if c.tunnels != nil && c.tunnels.Len() != 0 {
// Sleep for 10 seconds if we have some tunnels.
// TODO (cjcullen): tunnels can lag behind actually existing nodes.
time.Sleep(9 * time.Second)
}
}, 1*time.Second, c.stopChan)
// Refresh loop for tunnels
// TODO: could make this more controller-ish
}

// nodesSyncLoop lists nodes ever 15 seconds, calling Update() on the TunnelList
// each time (Update() is a noop if no changes are necessary).
func (c *SSHTunneler) nodesSyncLoop() {
// TODO (cjcullen) make this watch.
go util.Until(func() {
time.Sleep(5 * time.Minute)
if err := c.refreshTunnels(user, privateKeyfile); err != nil {
glog.Errorf("Failed to refresh SSH Tunnels: %v", err)
addrs, err := c.getAddresses()
glog.Infof("Calling update w/ addrs: %v", addrs)
if err != nil {
glog.Errorf("Failed to getAddresses: %v", err)
}
}, 0*time.Second, c.stopChan)
c.tunnels.Update(addrs)
atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
}, 15*time.Second, c.stopChan)
}

func (c *SSHTunneler) generateSSHKey(user, privateKeyfile, publicKeyfile string) error {
// TODO: user is not used. Consider removing it as an input to the function.
func generateSSHKey(privateKeyfile, publicKeyfile string) error {
private, public, err := util.GenerateKey(2048)
if err != nil {
return err
Expand Down
20 changes: 3 additions & 17 deletions pkg/master/tunneler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,6 @@ func TestSecondsSinceSync(t *testing.T) {
assert.Equal(int64(-2678400), tunneler.SecondsSinceSync())
}

// TestRefreshTunnels verifies that the function errors when no addresses
// are associated with nodes
func TestRefreshTunnels(t *testing.T) {
tunneler := &SSHTunneler{}
tunneler.getAddresses = func() ([]string, error) { return []string{}, nil }
assert := assert.New(t)

// Fail case (no addresses associated with nodes)
assert.Error(tunneler.refreshTunnels("test", "/somepath/undefined"))

// TODO: pass case without needing actual connections?
}

// TestIsTunnelSyncHealthy verifies that the 600 second lag test
// is honored.
func TestIsTunnelSyncHealthy(t *testing.T) {
Expand Down Expand Up @@ -108,7 +95,6 @@ func generateTempFilePath(prefix string) string {
// TestGenerateSSHKey verifies that SSH key generation does indeed
// generate keys even with keys already exist.
func TestGenerateSSHKey(t *testing.T) {
tunneler := &SSHTunneler{}
assert := assert.New(t)

privateKey := generateTempFilePath("private")
Expand All @@ -119,17 +105,17 @@ func TestGenerateSSHKey(t *testing.T) {
os.Remove(publicKey)

// Pass case: Sunny day case
err := tunneler.generateSSHKey("unused", privateKey, publicKey)
err := generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)

// Pass case: PrivateKey exists test case
os.Remove(publicKey)
err = tunneler.generateSSHKey("unused", privateKey, publicKey)
err = generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)

// Pass case: PublicKey exists test case
os.Remove(privateKey)
err = tunneler.generateSSHKey("unused", privateKey, publicKey)
err = generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)

// Make sure we have no test keys laying around
Expand Down