Skip to content

Commit

Permalink
Support multi control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsamuel committed Dec 10, 2020
1 parent ead11c6 commit 60b77a7
Show file tree
Hide file tree
Showing 20 changed files with 211 additions and 52 deletions.
14 changes: 13 additions & 1 deletion cmd/minikube/cmd/node_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/driver"
"k8s.io/minikube/pkg/minikube/exit"
"k8s.io/minikube/pkg/minikube/mustload"
"k8s.io/minikube/pkg/minikube/node"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/out/register"
"k8s.io/minikube/pkg/minikube/reason"
"k8s.io/minikube/pkg/minikube/style"
)
Expand All @@ -48,16 +50,25 @@ var nodeAddCmd = &cobra.Command{

name := node.Name(len(cc.Nodes) + 1)

out.Step(style.Happy, "Adding node {{.name}} to cluster {{.cluster}}", out.V{"name": name, "cluster": cc.Name})
if cp {
out.Step(style.Happy, "Adding control plane node {{.name}} to cluster {{.cluster}}", out.V{"name": name, "cluster": cc.Name})
} else {
out.Step(style.Happy, "Adding node {{.name}} to cluster {{.cluster}}", out.V{"name": name, "cluster": cc.Name})
}

// TODO: Deal with parameters better. Ideally we should be able to acceot any node-specific minikube start params here.
n := config.Node{
Name: name,
Worker: worker,
ControlPlane: cp,
ApiEndpointServer: false,
KubernetesVersion: cc.KubernetesConfig.KubernetesVersion,
}

if n.ControlPlane {
n.Port = constants.APIServerPort
}

// Make sure to decrease the default amount of memory we use per VM if this is the first worker node
if len(cc.Nodes) == 1 {
warnAboutMultiNode()
Expand All @@ -66,6 +77,7 @@ var nodeAddCmd = &cobra.Command{
}
}

register.Reg.SetStep(register.InitialSetup)
if err := node.Add(cc, n, false); err != nil {
_, err := maybeDeleteAndRetry(cmd, *cc, n, nil, err)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/minikube/cmd/node_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var nodeStartCmd = &cobra.Command{
}

register.Reg.SetStep(register.InitialSetup)
r, p, m, h, err := node.Provision(cc, n, n.ControlPlane, viper.GetBool(deleteOnFailure))
r, p, m, h, err := node.Provision(cc, n, viper.GetBool(deleteOnFailure))
if err != nil {
exit.Error(reason.GuestNodeProvision, "provisioning host for node", err)
}
Expand All @@ -71,7 +71,7 @@ var nodeStartCmd = &cobra.Command{
ExistingAddons: nil,
}

_, err = node.Start(s, n.ControlPlane)
_, err = node.Start(s)
if err != nil {
_, err := maybeDeleteAndRetry(cmd, *cc, *n, nil, err)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/minikube/cmd/node_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var nodeStopCmd = &cobra.Command{
}

machineName := driver.MachineName(*cc, *n)

node.MustReset(*cc, *n, api, machineName)
err = machine.StopHost(api, machineName)
if err != nil {
out.FatalT("Failed to stop node {{.name}}", out.V{"name": name})
Expand Down
18 changes: 13 additions & 5 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func provisionWithDriver(cmd *cobra.Command, ds registry.DriverState, existing *
ssh.SetDefaultClient(ssh.External)
}

mRunner, preExists, mAPI, host, err := node.Provision(&cc, &n, true, viper.GetBool(deleteOnFailure))
mRunner, preExists, mAPI, host, err := node.Provision(&cc, &n, viper.GetBool(deleteOnFailure))
if err != nil {
return node.Starter{}, err
}
Expand All @@ -343,7 +343,13 @@ func provisionWithDriver(cmd *cobra.Command, ds registry.DriverState, existing *
}

func startWithDriver(cmd *cobra.Command, starter node.Starter, existing *config.ClusterConfig) (*kubeconfig.Settings, error) {
kubeconfig, err := node.Start(starter, true)
// TODO: Currently, we start the primary control plane first. If there are multiple control planes,
// the kube-apiserver will keep crash to wait for other apiserver to respond, which blocks health checks.
// As a temporary solution, we reset the stacked control planes before we stopped it.
// To fix this, we could:
// - Delay the health check.
// - Start all control planes at the same time.
kubeconfig, err := node.Start(starter)
if err != nil {
kubeconfig, err = maybeDeleteAndRetry(cmd, *starter.Cfg, *starter.Node, starter.ExistingAddons, err)
if err != nil {
Expand Down Expand Up @@ -374,6 +380,7 @@ func startWithDriver(cmd *cobra.Command, starter node.Starter, existing *config.
Name: nodeName,
Worker: true,
ControlPlane: false,
ApiEndpointServer: false,
KubernetesVersion: starter.Cfg.KubernetesConfig.KubernetesVersion,
}
out.Ln("") // extra newline for clarity on the command line
Expand All @@ -384,7 +391,7 @@ func startWithDriver(cmd *cobra.Command, starter node.Starter, existing *config.
}
} else {
for _, n := range existing.Nodes {
if !n.ControlPlane {
if !n.ApiEndpointServer { // TODO Make this backward compatibility
err := node.Add(starter.Cfg, n, viper.GetBool(deleteOnFailure))
if err != nil {
return nil, errors.Wrap(err, "adding node")
Expand Down Expand Up @@ -492,7 +499,7 @@ func maybeDeleteAndRetry(cmd *cobra.Command, existing config.ClusterConfig, n co
cc := updateExistingConfigFromFlags(cmd, &existing)
var kubeconfig *kubeconfig.Settings
for _, n := range cc.Nodes {
r, p, m, h, err := node.Provision(&cc, &n, n.ControlPlane, false)
r, p, m, h, err := node.Provision(&cc, &n, false)
s := node.Starter{
Runner: r,
PreExists: p,
Expand All @@ -507,7 +514,7 @@ func maybeDeleteAndRetry(cmd *cobra.Command, existing config.ClusterConfig, n co
return nil, err
}

k, err := node.Start(s, n.ControlPlane)
k, err := node.Start(s)
if n.ControlPlane {
kubeconfig = k
}
Expand Down Expand Up @@ -1142,6 +1149,7 @@ func createNode(cc config.ClusterConfig, kubeNodeName string, existing *config.C
KubernetesVersion: getKubernetesVersion(&cc),
Name: kubeNodeName,
ControlPlane: true,
ApiEndpointServer: true,
Worker: true,
}
cc.Nodes = []config.Node{cp}
Expand Down
16 changes: 16 additions & 0 deletions cmd/minikube/cmd/start_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,22 @@ func upgradeExistingConfig(cc *config.ClusterConfig) {
cc.KicBaseImage = viper.GetString(kicBaseImage)
klog.Infof("config upgrade: KicBaseImage=%s", cc.KicBaseImage)
}

needTagApiEndpointServer := true
for i := range cc.Nodes {
if cc.Nodes[i].ApiEndpointServer {
needTagApiEndpointServer = false
break
}
}
if needTagApiEndpointServer {
for i := range cc.Nodes {
if cc.Nodes[i].ControlPlane {
cc.Nodes[i].ApiEndpointServer = true
break
}
}
}
}

// updateExistingConfigFromFlags will update the existing config from the flags - used on a second start
Expand Down
31 changes: 20 additions & 11 deletions cmd/minikube/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,18 @@ var (

// Status holds string representations of component states
type Status struct {
Name string
Host string
Kubelet string
Name string
Host string
Kubelet string

// APIServer indicates kube-apiserver status
APIServer string
Kubeconfig string
Worker bool
TimeToStop string
// IsAPIEndpoint indicates primary control plane (api endpoint)
IsAPIEndpoint bool
IP string
}

// ClusterState holds a cluster state representation
Expand Down Expand Up @@ -177,18 +182,20 @@ const (
clusterNotRunningStatusFlag = 1 << 1
k8sNotRunningStatusFlag = 1 << 2
defaultStatusFormat = `{{.Name}}
type: Control Plane
type: Control Plane{{if .IsAPIEndpoint}} (Primary){{end}}
host: {{.Host}}
kubelet: {{.Kubelet}}
apiserver: {{.APIServer}}
kubeconfig: {{.Kubeconfig}}
timeToStop: {{.TimeToStop}}
IP: {{.IP}}
`
workerStatusFormat = `{{.Name}}
type: Worker
host: {{.Host}}
kubelet: {{.Kubelet}}
IP: {{.IP}}
`
)
Expand Down Expand Up @@ -304,13 +311,15 @@ func nodeStatus(api libmachine.API, cc config.ClusterConfig, n config.Node) (*St
name := driver.MachineName(cc, n)

st := &Status{
Name: name,
Host: Nonexistent,
APIServer: Nonexistent,
Kubelet: Nonexistent,
Kubeconfig: Nonexistent,
Worker: !controlPlane,
TimeToStop: Nonexistent,
Name: name,
Host: Nonexistent,
APIServer: Nonexistent,
Kubelet: Nonexistent,
Kubeconfig: Nonexistent,
Worker: !controlPlane,
TimeToStop: Nonexistent,
IsAPIEndpoint: n.ApiEndpointServer,
IP: n.IP,
}

hs, err := machine.Status(api, name)
Expand Down
17 changes: 17 additions & 0 deletions cmd/minikube/cmd/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/mustload"
"k8s.io/minikube/pkg/minikube/node"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/out/register"
"k8s.io/minikube/pkg/minikube/reason"
Expand Down Expand Up @@ -137,14 +138,30 @@ func stopProfile(profile string) int {
api, cc := mustload.Partial(profile)
defer api.Close()

primaryMachineName := ""
for _, n := range cc.Nodes {
machineName := driver.MachineName(*cc, n)

if n.ApiEndpointServer {
// Skip because we need to update etcd members
primaryMachineName = machineName
continue
} else if n.ControlPlane {
// Remove from primary control plane
node.MustReset(*cc, n, api, machineName)
}

nonexistent := stop(api, machineName)
if !nonexistent {
stoppedNodes++
}
}
if primaryMachineName != "" {
nonexistent := stop(api, primaryMachineName)
if !nonexistent {
stoppedNodes++
}
}

if err := killMountProcess(); err != nil {
out.WarningT("Unable to kill mount process: {{.error}}", out.V{"error": err})
Expand Down
2 changes: 1 addition & 1 deletion pkg/minikube/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Bootstrapper interface {
WaitForNode(config.ClusterConfig, config.Node, time.Duration) error
JoinCluster(config.ClusterConfig, config.Node, string) error
UpdateNode(config.ClusterConfig, config.Node, cruntime.Manager) error
GenerateToken(config.ClusterConfig) (string, error)
GenerateToken(config.ClusterConfig, bool) (string, error)
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(config.ClusterConfig, LogOptions) map[string]string
SetupCerts(config.KubernetesConfig, config.Node) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/minikube/bootstrapper/bsutil/extraconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var KubeadmExtraArgsAllowed = map[int][]string{
"kubeconfig-dir",
"node-name",
"cri-socket",
"experimental-upload-certs",
"upload-certs",
"certificate-key",
"rootfs",
"skip-phases",
Expand Down
2 changes: 1 addition & 1 deletion pkg/minikube/bootstrapper/bsutil/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func GenerateKubeadmYAML(cc config.ClusterConfig, n config.Node, r cruntime.Mana
CgroupDriver: cgroupDriver,
ClientCAFile: path.Join(vmpath.GuestKubernetesCertsDir, "ca.crt"),
StaticPodPath: vmpath.GuestManifestsDir,
ControlPlaneAddress: constants.ControlPlaneAlias,
ControlPlaneAddress: constants.ApiEndpointAlias,
KubeProxyOptions: createKubeProxyOptions(k8s.ExtraOptions),
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/minikube/bootstrapper/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig, n config.Node)
copyableFiles = append(copyableFiles, certFile)
}

endpoint := net.JoinHostPort(constants.ApiEndpointAlias, fmt.Sprint(k8s.NodePort))
if n.ApiEndpointServer {
endpoint = net.JoinHostPort("localhost", fmt.Sprint(n.Port))
}
kcs := &kubeconfig.Settings{
ClusterName: n.Name,
ClusterServerAddress: fmt.Sprintf("https://%s", net.JoinHostPort("localhost", fmt.Sprint(n.Port))),
ClusterServerAddress: fmt.Sprintf("https://%s", endpoint),
ClientCertificate: path.Join(vmpath.GuestKubernetesCertsDir, "apiserver.crt"),
ClientKey: path.Join(vmpath.GuestKubernetesCertsDir, "apiserver.key"),
CertificateAuthority: path.Join(vmpath.GuestKubernetesCertsDir, "ca.crt"),
Expand Down Expand Up @@ -183,7 +187,7 @@ func generateSharedCACerts() (CACerts, error) {
func generateProfileCerts(k8s config.KubernetesConfig, n config.Node, ccs CACerts) ([]string, error) {

// Only generate these certs for the api server
if !n.ControlPlane {
if !n.ApiEndpointServer {
return []string{}, nil
}

Expand All @@ -201,7 +205,7 @@ func generateProfileCerts(k8s config.KubernetesConfig, n config.Node, ccs CACert
apiServerIPs = append(apiServerIPs, net.ParseIP(v))
}

apiServerNames := append(k8s.APIServerNames, k8s.APIServerName, constants.ControlPlaneAlias)
apiServerNames := append(k8s.APIServerNames, k8s.APIServerName, constants.ApiEndpointAlias)
apiServerAlternateNames := append(
apiServerNames,
util.GetAlternateDNS(k8s.DNSDomain)...)
Expand Down

0 comments on commit 60b77a7

Please sign in to comment.