Skip to content

Commit

Permalink
Merge pull request #7591 from tstromberg/oci-go-faster
Browse files Browse the repository at this point in the history
kic: Use SSHRunner by default (20% faster startup)
  • Loading branch information
medyagh committed Apr 11, 2020
2 parents 110a7c0 + e98370a commit 9b9d7c3
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 130 deletions.
15 changes: 8 additions & 7 deletions cmd/minikube/cmd/delete.go
Expand Up @@ -329,23 +329,24 @@ func profileDeletionErr(cname string, additionalInfo string) error {

func uninstallKubernetes(api libmachine.API, cc config.ClusterConfig, n config.Node, bsName string) error {
out.T(out.Resetting, "Uninstalling Kubernetes {{.kubernetes_version}} using {{.bootstrapper_name}} ...", out.V{"kubernetes_version": cc.KubernetesConfig.KubernetesVersion, "bootstrapper_name": bsName})
clusterBootstrapper, err := cluster.Bootstrapper(api, bsName, cc, n)
host, err := machine.LoadHost(api, driver.MachineName(cc, n))
if err != nil {
return DeletionError{Err: fmt.Errorf("unable to get bootstrapper: %v", err), Errtype: Fatal}
return DeletionError{Err: fmt.Errorf("unable to load host: %v", err), Errtype: MissingCluster}
}

host, err := machine.LoadHost(api, driver.MachineName(cc, n))
r, err := machine.CommandRunner(host)
if err != nil {
exit.WithError("Error getting host", err)
return DeletionError{Err: fmt.Errorf("unable to get command runner %v", err), Errtype: MissingCluster}
}
r, err := machine.CommandRunner(host)

clusterBootstrapper, err := cluster.Bootstrapper(api, bsName, cc, r)
if err != nil {
exit.WithError("Failed to get command runner", err)
return DeletionError{Err: fmt.Errorf("unable to get bootstrapper: %v", err), Errtype: Fatal}
}

cr, err := cruntime.New(cruntime.Config{Type: cc.KubernetesConfig.ContainerRuntime, Runner: r})
if err != nil {
exit.WithError("Failed runtime", err)
return DeletionError{Err: fmt.Errorf("unable to get runtime: %v", err), Errtype: Fatal}
}

// Unpause the cluster if necessary to avoid hung kubeadm
Expand Down
2 changes: 1 addition & 1 deletion cmd/minikube/cmd/logs.go
Expand Up @@ -53,7 +53,7 @@ var logsCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
co := mustload.Running(ClusterFlagValue())

bs, err := cluster.Bootstrapper(co.API, viper.GetString(cmdcfg.Bootstrapper), *co.Config, *co.CP.Node)
bs, err := cluster.Bootstrapper(co.API, viper.GetString(cmdcfg.Bootstrapper), *co.Config, co.CP.Runner)
if err != nil {
exit.WithError("Error getting cluster bootstrapper", err)
}
Expand Down
56 changes: 27 additions & 29 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Expand Up @@ -68,18 +68,8 @@ type Bootstrapper struct {
}

// NewBootstrapper creates a new kubeadm.Bootstrapper
// TODO(#6891): Remove node as an argument
func NewBootstrapper(api libmachine.API, cc config.ClusterConfig, n config.Node) (*Bootstrapper, error) {
name := driver.MachineName(cc, n)
h, err := api.Load(name)
if err != nil {
return nil, errors.Wrap(err, "getting api client")
}
runner, err := machine.CommandRunner(h)
if err != nil {
return nil, errors.Wrap(err, "command runner")
}
return &Bootstrapper{c: runner, contextName: cc.Name, k8sClient: nil}, nil
func NewBootstrapper(api libmachine.API, cc config.ClusterConfig, r command.Runner) (*Bootstrapper, error) {
return &Bootstrapper{c: r, contextName: cc.Name, k8sClient: nil}, nil
}

// GetAPIServerStatus returns the api-server status
Expand Down Expand Up @@ -111,8 +101,7 @@ func (k *Bootstrapper) LogCommands(cfg config.ClusterConfig, o bootstrapper.LogO
dmesg.WriteString(fmt.Sprintf(" | tail -n %d", o.Lines))
}

describeNodes := fmt.Sprintf("sudo %s describe nodes --kubeconfig=%s",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"),
describeNodes := fmt.Sprintf("sudo %s describe nodes --kubeconfig=%s", kubectlPath(cfg),
path.Join(vmpath.GuestPersistentDir, "kubeconfig"))

return map[string]string{
Expand Down Expand Up @@ -218,7 +207,7 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error {
go func() {
// the overlay is required for containerd and cri-o runtime: see #7428
if driver.IsKIC(cfg.Driver) && cfg.KubernetesConfig.ContainerRuntime != "docker" {
if err := k.applyKicOverlay(cfg); err != nil {
if err := k.applyKICOverlay(cfg); err != nil {
glog.Errorf("failed to apply kic overlay: %v", err)
}
}
Expand Down Expand Up @@ -704,7 +693,7 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
}

// Installs compatibility shims for non-systemd environments
kubeletPath := path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl")
kubeletPath := path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubelet")
shims, err := sm.GenerateInitShim("kubelet", kubeletPath, bsutil.KubeletSystemdConfFile)
if err != nil {
return errors.Wrap(err, "shim")
Expand Down Expand Up @@ -764,21 +753,32 @@ func startKubeletIfRequired(runner command.Runner, sm sysinit.Manager) error {
return sm.Start("kubelet")
}

// applyKicOverlay applies the CNI plugin needed to make kic work
func (k *Bootstrapper) applyKicOverlay(cfg config.ClusterConfig) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

cmd := exec.CommandContext(ctx, "sudo",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"), "create", fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")),
"-f", "-")
// kubectlPath returns the path to the kubelet
func kubectlPath(cfg config.ClusterConfig) string {
return path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl")
}

// applyKICOverlay applies the CNI plugin needed to make kic work
func (k *Bootstrapper) applyKICOverlay(cfg config.ClusterConfig) error {
b := bytes.Buffer{}
if err := kicCNIConfig.Execute(&b, struct{ ImageName string }{ImageName: kic.OverlayImage}); err != nil {
return err
}

cmd.Stdin = bytes.NewReader(b.Bytes())
ko := path.Join(vmpath.GuestEphemeralDir, fmt.Sprintf("kic_overlay.yaml"))
f := assets.NewMemoryAssetTarget(b.Bytes(), ko, "0644")

if err := k.c.Copy(f); err != nil {
return errors.Wrapf(err, "copy")
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

cmd := exec.CommandContext(ctx, "sudo", kubectlPath(cfg), "apply",
fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")),
"-f", ko)

if rr, err := k.c.RunCmd(cmd); err != nil {
return errors.Wrapf(err, "cmd: %s output: %s", rr.Command(), rr.Output())
}
Expand Down Expand Up @@ -807,8 +807,7 @@ func (k *Bootstrapper) applyNodeLabels(cfg config.ClusterConfig) error {
defer cancel()
// example:
// sudo /var/lib/minikube/binaries/<version>/kubectl label nodes minikube.k8s.io/version=<version> minikube.k8s.io/commit=aa91f39ffbcf27dcbb93c4ff3f457c54e585cf4a-dirty minikube.k8s.io/name=p1 minikube.k8s.io/updated_at=2020_02_20T12_05_35_0700 --all --overwrite --kubeconfig=/var/lib/minikube/kubeconfig
cmd := exec.CommandContext(ctx, "sudo",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"),
cmd := exec.CommandContext(ctx, "sudo", kubectlPath(cfg),
"label", "nodes", verLbl, commitLbl, nameLbl, createdAtLbl, "--all", "--overwrite",
fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")))

Expand All @@ -826,8 +825,7 @@ func (k *Bootstrapper) elevateKubeSystemPrivileges(cfg config.ClusterConfig) err
defer cancel()
rbacName := "minikube-rbac"
// kubectl create clusterrolebinding minikube-rbac --clusterrole=cluster-admin --serviceaccount=kube-system:default
cmd := exec.CommandContext(ctx, "sudo",
path.Join(vmpath.GuestPersistentDir, "binaries", cfg.KubernetesConfig.KubernetesVersion, "kubectl"),
cmd := exec.CommandContext(ctx, "sudo", kubectlPath(cfg),
"create", "clusterrolebinding", rbacName, "--clusterrole=cluster-admin", "--serviceaccount=kube-system:default",
fmt.Sprintf("--kubeconfig=%s", path.Join(vmpath.GuestPersistentDir, "kubeconfig")))
rr, err := k.c.RunCmd(cmd)
Expand Down
5 changes: 3 additions & 2 deletions pkg/minikube/cluster/cluster.go
Expand Up @@ -26,6 +26,7 @@ import (

"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/bootstrapper/kubeadm"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/exit"
)
Expand All @@ -44,12 +45,12 @@ func init() {

// Bootstrapper returns a new bootstrapper for the cluster
// TODO(#6891): Remove node as an argument
func Bootstrapper(api libmachine.API, bootstrapperName string, cc config.ClusterConfig, n config.Node) (bootstrapper.Bootstrapper, error) {
func Bootstrapper(api libmachine.API, bootstrapperName string, cc config.ClusterConfig, r command.Runner) (bootstrapper.Bootstrapper, error) {
var b bootstrapper.Bootstrapper
var err error
switch bootstrapperName {
case bootstrapper.Kubeadm:
b, err = kubeadm.NewBootstrapper(api, cc, n)
b, err = kubeadm.NewBootstrapper(api, cc, r)
if err != nil {
return nil, errors.Wrap(err, "getting a new kubeadm bootstrapper")
}
Expand Down
64 changes: 59 additions & 5 deletions pkg/minikube/command/ssh_runner.go
Expand Up @@ -25,12 +25,15 @@ import (
"sync"
"time"

"github.com/docker/machine/libmachine/drivers"
"github.com/golang/glog"
"github.com/kballard/go-shellquote"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"golang.org/x/sync/errgroup"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/sshutil"
"k8s.io/minikube/pkg/util/retry"
)

var (
Expand All @@ -41,21 +44,61 @@ var (
//
// It implements the CommandRunner interface.
type SSHRunner struct {
d drivers.Driver
c *ssh.Client
}

// NewSSHRunner returns a new SSHRunner that will run commands
// through the ssh.Client provided.
func NewSSHRunner(c *ssh.Client) *SSHRunner {
return &SSHRunner{c}
func NewSSHRunner(d drivers.Driver) *SSHRunner {
return &SSHRunner{d: d, c: nil}
}

// client returns an ssh client (uses retry underneath)
func (s *SSHRunner) client() (*ssh.Client, error) {
if s.c != nil {
return s.c, nil
}

c, err := sshutil.NewSSHClient(s.d)
if err != nil {
return nil, errors.Wrap(err, "new client")
}
s.c = c
return s.c, nil
}

// session returns an ssh session, retrying if necessary
func (s *SSHRunner) session() (*ssh.Session, error) {
var sess *ssh.Session
getSession := func() (err error) {
client, err := s.client()
if err != nil {
return errors.Wrap(err, "new client")
}

sess, err = client.NewSession()
if err != nil {
glog.Warningf("session error, resetting client: %v", err)
s.c = nil
return err
}
return nil
}

if err := retry.Expo(getSession, 250*time.Millisecond, 2*time.Second); err != nil {
return nil, err
}

return sess, nil
}

// Remove runs a command to delete a file on the remote.
func (s *SSHRunner) Remove(f assets.CopyableFile) error {
dst := path.Join(f.GetTargetDir(), f.GetTargetName())
glog.Infof("rm: %s", dst)

sess, err := s.c.NewSession()
sess, err := s.session()
if err != nil {
return errors.Wrap(err, "getting ssh session")
}
Expand Down Expand Up @@ -97,6 +140,10 @@ func teeSSH(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error {

// RunCmd implements the Command Runner interface to run a exec.Cmd object
func (s *SSHRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
if cmd.Stdin != nil {
return nil, fmt.Errorf("SSHRunner does not support stdin - you could be the first to add it")
}

rr := &RunResult{Args: cmd.Args}
glog.Infof("Run: %v", rr.Command())

Expand All @@ -117,7 +164,7 @@ func (s *SSHRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
errb = io.MultiWriter(cmd.Stderr, &rr.Stderr)
}

sess, err := s.c.NewSession()
sess, err := s.session()
if err != nil {
return rr, errors.Wrap(err, "NewSession")
}
Expand Down Expand Up @@ -170,10 +217,17 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error {
glog.Warningf("0 byte asset: %+v", f)
}

sess, err := s.c.NewSession()
sess, err := s.session()
if err != nil {
return errors.Wrap(err, "NewSession")
}
defer func() {
if err := sess.Close(); err != nil {
if err != io.EOF {
glog.Errorf("session close: %v", err)
}
}
}()

w, err := sess.StdinPipe()
if err != nil {
Expand Down
19 changes: 3 additions & 16 deletions pkg/minikube/machine/client.go
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/mcnutils"
"github.com/docker/machine/libmachine/persist"
"github.com/docker/machine/libmachine/ssh"
lmssh "github.com/docker/machine/libmachine/ssh"
"github.com/docker/machine/libmachine/state"
"github.com/docker/machine/libmachine/swarm"
"github.com/docker/machine/libmachine/version"
Expand All @@ -48,13 +48,12 @@ import (
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/registry"
"k8s.io/minikube/pkg/minikube/sshutil"
)

// NewRPCClient gets a new client.
func NewRPCClient(storePath, certsDir string) libmachine.API {
c := libmachine.NewClient(storePath, certsDir)
c.SSHClientType = ssh.Native
c.SSHClientType = lmssh.Native
return c
}

Expand Down Expand Up @@ -154,19 +153,7 @@ func CommandRunner(h *host.Host) (command.Runner, error) {
return command.NewExecRunner(), nil
}

if driver.IsKIC(h.Driver.DriverName()) {
return command.NewKICRunner(h.Name, h.Driver.DriverName()), nil
}
return SSHRunner(h)
}

// SSHRunner returns an SSH runner for the host
func SSHRunner(h *host.Host) (command.Runner, error) {
client, err := sshutil.NewSSHClient(h.Driver)
if err != nil {
return nil, errors.Wrap(err, "getting ssh client for bootstrapper")
}
return command.NewSSHRunner(client), nil
return command.NewSSHRunner(h.Driver), nil
}

// Create creates the host
Expand Down

0 comments on commit 9b9d7c3

Please sign in to comment.