Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
Plumb Context through 'plan' steps
Browse files Browse the repository at this point in the history
Doesn't do anything different as yet, but this will be useful for
tracing, or for cancelling a command.
  • Loading branch information
bboreham committed Sep 29, 2020
1 parent 6fdf5e9 commit be40f31
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 64 deletions.
13 changes: 7 additions & 6 deletions cmd/wksctl/apply/apply.go
@@ -1,6 +1,7 @@
package apply

import (
"context"
"path/filepath"
"strings"

Expand All @@ -22,7 +23,7 @@ import (
var Cmd = &cobra.Command{
Use: "apply",
Short: "Create or update a Kubernetes cluster",
RunE: func(_ *cobra.Command, _ []string) error { a := Applier{&globalParams}; return a.Apply() },
RunE: func(cmd *cobra.Command, _ []string) error { a := Applier{&globalParams}; return a.Apply(cmd.Context()) },
}

type Params struct {
Expand Down Expand Up @@ -68,7 +69,7 @@ type Applier struct {
Params *Params
}

func (a *Applier) Apply() error {
func (a *Applier) Apply(ctx context.Context) error {
var clusterPath, machinesPath string

// TODO: deduplicate clusterPath/machinesPath evaluation between here and other places
Expand All @@ -92,18 +93,18 @@ func (a *Applier) Apply() error {
}
}

return a.initiateCluster(clusterPath, machinesPath)
return a.initiateCluster(ctx, clusterPath, machinesPath)
}

func (a *Applier) initiateCluster(clusterManifestPath, machinesManifestPath string) error {
func (a *Applier) initiateCluster(ctx context.Context, clusterManifestPath, machinesManifestPath string) error {
sp := specs.NewFromPaths(clusterManifestPath, machinesManifestPath)
sshClient, err := ssh.NewClientForMachine(sp.MasterSpec, sp.ClusterSpec.User, a.Params.sshKeyPath, log.GetLevel() > log.InfoLevel)

if err != nil {
return errors.Wrap(err, "failed to create SSH client")
}
defer sshClient.Close()
installer, err := capeios.Identify(sshClient)
installer, err := capeios.Identify(ctx, sshClient)
if err != nil {
return errors.Wrapf(err, "failed to identify operating system for seed node (%s)", sp.GetMasterPublicAddress())
}
Expand Down Expand Up @@ -155,7 +156,7 @@ func (a *Applier) initiateCluster(clusterManifestPath, machinesManifestPath stri
}
}

if err := wksos.SetupSeedNode(installer, wksos.SeedNodeParams{
if err := wksos.SetupSeedNode(ctx, installer, wksos.SeedNodeParams{
PublicIP: sp.GetMasterPublicAddress(),
PrivateIP: sp.GetMasterPrivateAddress(),
ServicesCIDRBlocks: sp.Cluster.Spec.ClusterNetwork.Services.CIDRBlocks,
Expand Down
7 changes: 4 additions & 3 deletions cmd/wksctl/kubeconfig/kubeconfig.go
@@ -1,6 +1,7 @@
package kubeconfig

import (
"context"
"fmt"
"path/filepath"

Expand Down Expand Up @@ -95,10 +96,10 @@ func kubeconfigRun(cmd *cobra.Command, args []string) error {
}
}

return writeKubeconfig(clusterPath, machinesPath)
return writeKubeconfig(cmd.Context(), clusterPath, machinesPath)
}

func writeKubeconfig(cpath, mpath string) error {
func writeKubeconfig(ctx context.Context, cpath, mpath string) error {
var wksHome string
var err error
var configPath string
Expand All @@ -120,7 +121,7 @@ func writeKubeconfig(cpath, mpath string) error {
configPath = clientcmd.RecommendedHomeFile
}

configStr, err := config.GetRemoteKubeconfig(sp, kubeconfigOptions.sshKeyPath, kubeconfigOptions.verbose, kubeconfigOptions.skipTLSVerify)
configStr, err := config.GetRemoteKubeconfig(ctx, sp, kubeconfigOptions.sshKeyPath, kubeconfigOptions.verbose, kubeconfigOptions.skipTLSVerify)
if err != nil {
return errors.Wrapf(err, "failed to get remote kubeconfig")
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/wksctl/plan/view/view.go
@@ -1,6 +1,7 @@
package view

import (
"context"
"fmt"
"path/filepath"

Expand Down Expand Up @@ -79,18 +80,18 @@ func planRun(cmd *cobra.Command, args []string) error {
}
}

return displayPlan(clusterPath, machinesPath)
return displayPlan(cmd.Context(), clusterPath, machinesPath)
}

func displayPlan(clusterManifestPath, machinesManifestPath string) error {
func displayPlan(ctx context.Context, clusterManifestPath, machinesManifestPath string) error {
// TODO: reuse the actual plan created by `wksctl apply`, rather than trying to construct a similar plan and printing it.
sp := specs.NewFromPaths(clusterManifestPath, machinesManifestPath)
sshClient, err := ssh.NewClientForMachine(sp.MasterSpec, sp.ClusterSpec.User, viewOptions.sshKeyPath, viewOptions.verbose)
if err != nil {
return errors.Wrap(err, "failed to create SSH client: ")
}
defer sshClient.Close()
installer, err := capeios.Identify(sshClient)
installer, err := capeios.Identify(ctx, sshClient)
if err != nil {
return errors.Wrapf(err, "failed to identify operating system for seed node (%s)", sp.GetMasterPublicAddress())
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func displayPlan(clusterManifestPath, machinesManifestPath string) error {
AddonNamespaces: manifest.DefaultAddonNamespaces,
ConfigDirectory: configDir,
}
plan, err := wksos.CreateSeedNodeSetupPlan(installer, params)
plan, err := wksos.CreateSeedNodeSetupPlan(ctx, installer, params)
if err != nil {
return errors.Wrap(err, "could not generate plan")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -22,7 +22,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
github.com/thanhpk/randstr v1.0.4
github.com/weaveworks/cluster-api-provider-existinginfra v0.0.2
github.com/weaveworks/cluster-api-provider-existinginfra v0.0.3
github.com/weaveworks/footloose v0.0.0-20200609124411-8f3df89ea188
github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab
github.com/weaveworks/launcher v0.0.0-20180824102238-59a4fcc32c9c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -797,6 +797,8 @@ github.com/vishvananda/netns v0.0.0-20171111001504-be1fbeda1936/go.mod h1:ZjcWmF
github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/weaveworks/cluster-api-provider-existinginfra v0.0.2 h1:Z/OUvMxnX4KjyIZgXUKbveTZPKKr9J2GorwEeIs2Wkk=
github.com/weaveworks/cluster-api-provider-existinginfra v0.0.2/go.mod h1:1+bY5QfDcvqYDHCLF7a57X+/Zt3OSqyL/x7wayf+Wbo=
github.com/weaveworks/cluster-api-provider-existinginfra v0.0.3 h1:kOACHKpyeJYFPQ4pk0TlHLkrswAtREuPI0+6XC6yCVM=
github.com/weaveworks/cluster-api-provider-existinginfra v0.0.3/go.mod h1:1+bY5QfDcvqYDHCLF7a57X+/Zt3OSqyL/x7wayf+Wbo=
github.com/weaveworks/footloose v0.0.0-20200609124411-8f3df89ea188 h1:BuiVM+91YRjWUSMvF93A7Sm7s7IHrxjrPcc/5boO2V0=
github.com/weaveworks/footloose v0.0.0-20200609124411-8f3df89ea188/go.mod h1:nxDdCjg1kb5+luh4mUCp+mtBZIWrhzoLIArrD99y+Sc=
github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab h1:mW+hgchD9qUUBqnuaDBj7BkcpFPk/FxeFcUFI5lvvUw=
Expand Down
25 changes: 13 additions & 12 deletions pkg/apis/wksprovider/machine/os/os.go
Expand Up @@ -2,6 +2,7 @@ package os

import (
"bytes"
"context"
"crypto/rsa"
"encoding/base64"
"fmt"
Expand Down Expand Up @@ -168,21 +169,21 @@ func (params SeedNodeParams) GetAddonNamespace(name string) string {
// SetupSeedNode installs Kubernetes on this machine, and store the provided
// manifests in the API server, so that the rest of the cluster can then be
// set up by the WKS controller.
func SetupSeedNode(o *capeios.OS, params SeedNodeParams) error {
p, err := CreateSeedNodeSetupPlan(o, params)
func SetupSeedNode(ctx context.Context, o *capeios.OS, params SeedNodeParams) error {
p, err := CreateSeedNodeSetupPlan(ctx, o, params)
if err != nil {
return err
}
return applySeedNodePlan(o, p)
return applySeedNodePlan(ctx, o, p)
}

// CreateSeedNodeSetupPlan constructs the seed node plan used to setup the initial node
// prior to turning control over to wks-controller
func CreateSeedNodeSetupPlan(o *capeios.OS, params SeedNodeParams) (*plan.Plan, error) {
func CreateSeedNodeSetupPlan(ctx context.Context, o *capeios.OS, params SeedNodeParams) (*plan.Plan, error) {
if err := params.Validate(); err != nil {
return nil, err
}
cfg, err := envcfg.GetEnvSpecificConfig(o.PkgType, params.Namespace, params.KubeletConfig.CloudProvider, o.Runner)
cfg, err := envcfg.GetEnvSpecificConfig(ctx, o.PkgType, params.Namespace, params.KubeletConfig.CloudProvider, o.Runner)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -213,7 +214,7 @@ func CreateSeedNodeSetupPlan(o *capeios.OS, params SeedNodeParams) (*plan.Plan,
return nil, err
}

criRes := capeirecipe.BuildCRIPlan(&cluster.Spec.CRI, cfg, o.PkgType)
criRes := capeirecipe.BuildCRIPlan(ctx, &cluster.Spec.CRI, cfg, o.PkgType)
b.AddResource("install:cri", criRes, plan.DependOn("install:config"))

k8sRes := capeirecipe.BuildK8SPlan(kubernetesVersion, params.KubeletConfig.NodeIP, cfg.SELinuxInstalled, cfg.SetSELinuxPermissive, cfg.DisableSwap, cfg.LockYUMPkgs, o.PkgType, params.KubeletConfig.CloudProvider, params.KubeletConfig.ExtraArguments)
Expand Down Expand Up @@ -296,7 +297,7 @@ func CreateSeedNodeSetupPlan(o *capeios.OS, params SeedNodeParams) (*plan.Plan,
}

// Set plan as an annotation on node, just like controller does
seedNodePlan, err := seedNodeSetupPlan(o, params, &cluster.Spec, configMaps, authConfigMap, pemSecretResources, kubernetesVersion, kubernetesNamespace)
seedNodePlan, err := seedNodeSetupPlan(ctx, o, params, &cluster.Spec, configMaps, authConfigMap, pemSecretResources, kubernetesVersion, kubernetesNamespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func addClusterAPICRDs(b *plan.Builder) ([]string, error) {
return crdIDs, nil
}

func seedNodeSetupPlan(o *capeios.OS, params SeedNodeParams, providerSpec *capeiv1alpha3.ClusterSpec, providerConfigMaps map[string]*v1.ConfigMap, authConfigMap *v1.ConfigMap, secretResources map[string]*secretResourceSpec, kubernetesVersion, kubernetesNamespace string) (*plan.Plan, error) {
func seedNodeSetupPlan(ctx context.Context, o *capeios.OS, params SeedNodeParams, providerSpec *capeiv1alpha3.ClusterSpec, providerConfigMaps map[string]*v1.ConfigMap, authConfigMap *v1.ConfigMap, secretResources map[string]*secretResourceSpec, kubernetesVersion, kubernetesNamespace string) (*plan.Plan, error) {
secrets := map[string]capeiresource.SecretData{}
for k, v := range secretResources {
secrets[k] = v.decrypted
Expand All @@ -559,17 +560,17 @@ func seedNodeSetupPlan(o *capeios.OS, params SeedNodeParams, providerSpec *capei
AddonNamespaces: params.AddonNamespaces,
ControlPlaneEndpoint: providerSpec.ControlPlaneEndpoint,
}
return o.CreateNodeSetupPlan(nodeParams)
return o.CreateNodeSetupPlan(ctx, nodeParams)
}

func applySeedNodePlan(o *capeios.OS, p *plan.Plan) error {
err := p.Undo(o.Runner, plan.EmptyState)
func applySeedNodePlan(ctx context.Context, o *capeios.OS, p *plan.Plan) error {
err := p.Undo(ctx, o.Runner, plan.EmptyState)
if err != nil {
log.Infof("Pre-plan cleanup failed:\n%s\n", err)
return err
}

_, err = p.Apply(o.Runner, plan.EmptyDiff())
_, err = p.Apply(ctx, o.Runner, plan.EmptyDiff())
if err != nil {
log.Errorf("Apply of Plan failed:\n%s\n", err)
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubernetes/config/kubeconfig.go
@@ -1,6 +1,7 @@
package config

import (
"context"
"fmt"
"net/url"
"regexp"
Expand Down Expand Up @@ -69,15 +70,15 @@ func Sanitize(configStr string, params Params) (string, error) {
}

// GetRemoteKubeconfig retrieves Kubernetes configuration from a master node of the cluster
func GetRemoteKubeconfig(sp *specs.Specs, sshKeyPath string, verbose, skipTLSVerify bool) (string, error) {
func GetRemoteKubeconfig(ctx context.Context, sp *specs.Specs, sshKeyPath string, verbose, skipTLSVerify bool) (string, error) {
sshClient, err := ssh.NewClientForMachine(sp.MasterSpec, sp.ClusterSpec.User, sshKeyPath, verbose)
if err != nil {
return "", errors.Wrap(err, "failed to create SSH client: ")
}
defer sshClient.Close()

runner := sudo.Runner{Runner: sshClient}
configStr, err := runner.RunCommand("cat /etc/kubernetes/admin.conf", nil)
configStr, err := runner.RunCommand(ctx, "cat /etc/kubernetes/admin.conf", nil)
if err != nil {
return "", errors.Wrap(err, "failed to retrieve Kubernetes configuration")
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/plan/resource/common.go
@@ -1,15 +1,16 @@
package resource

import (
"context"
"fmt"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/cluster-api-provider-existinginfra/pkg/plan"
)

func removeFile(remotePath string, runner plan.Runner) error {
if stdouterr, err := runner.RunCommand(fmt.Sprintf("rm -f %q", remotePath), nil); err != nil {
func removeFile(ctx context.Context, remotePath string, runner plan.Runner) error {
if stdouterr, err := runner.RunCommand(ctx, fmt.Sprintf("rm -f %q", remotePath), nil); err != nil {
log.WithField("stdouterr", stdouterr).WithField("path", remotePath).Debugf("failed to delete file")
return errors.Wrapf(err, "failed to delete %q", remotePath)
}
Expand Down
27 changes: 14 additions & 13 deletions pkg/plan/resource/kubeadm_init.go
Expand Up @@ -2,6 +2,7 @@ package resource

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"strings"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (ki *KubeadmInit) State() capeiplan.State {
// Apply implements plan.Resource.
// TODO: find a way to make this idempotent.
// TODO: should such a resource be split into smaller resources?
func (ki *KubeadmInit) Apply(runner capeiplan.Runner, diff capeiplan.Diff) (bool, error) {
func (ki *KubeadmInit) Apply(ctx context.Context, runner capeiplan.Runner, diff capeiplan.Diff) (bool, error) {
log.Debug("Initializing Kubernetes cluster")

sshKey, err := ssh.ReadPrivateKey(ki.SSHKeyPath)
Expand Down Expand Up @@ -140,12 +141,12 @@ func (ki *KubeadmInit) Apply(runner capeiplan.Runner, diff capeiplan.Diff) (bool
configBytes := buf.Bytes()

remotePath := "/tmp/wks_kubeadm_init_config.yaml"
if err = scripts.WriteFile(configBytes, remotePath, 0660, runner); err != nil {
if err = scripts.WriteFile(ctx, configBytes, remotePath, 0660, runner); err != nil {
return false, errors.Wrap(err, "failed to upload kubeadm's configuration")
}
log.WithField("yaml", string(configBytes)).Debug("uploaded kubeadm's configuration")
//nolint:errcheck
defer removeFile(remotePath, runner) // TODO: Deferred error checking
defer removeFile(ctx, remotePath, runner) // TODO: Deferred error checking

var stdOutErr string
p := buildKubeadmInitPlan(
Expand All @@ -154,7 +155,7 @@ func (ki *KubeadmInit) Apply(runner capeiplan.Runner, diff capeiplan.Diff) (bool
ki.UseIPTables,
ki.KubernetesVersion,
&stdOutErr)
_, err = p.Apply(runner, capeiplan.EmptyDiff())
_, err = p.Apply(ctx, runner, capeiplan.EmptyDiff())
if err != nil {
return false, errors.Wrap(err, "failed to initialize Kubernetes cluster with kubeadm")
}
Expand All @@ -174,14 +175,14 @@ func (ki *KubeadmInit) Apply(runner capeiplan.Runner, diff capeiplan.Diff) (bool
return false, err
}

if err := ki.kubectlApply("01_namespace.yaml", namespace, runner); err != nil {
if err := ki.kubectlApply(ctx, "01_namespace.yaml", namespace, runner); err != nil {
return false, err
}

if err := ki.kubectlApply("02_rbac.yaml", namespace, runner); err != nil {
if err := ki.kubectlApply(ctx, "02_rbac.yaml", namespace, runner); err != nil {
return false, err
}
return true, ki.applySecretWith(sshKey, caCertHash, certKey, namespace, runner)
return true, ki.applySecretWith(ctx, sshKey, caCertHash, certKey, namespace, runner)
}

func (ki *KubeadmInit) updateManifestNamespace(fileName, namespace string) ([]byte, error) {
Expand All @@ -196,12 +197,12 @@ func (ki *KubeadmInit) updateManifestNamespace(fileName, namespace string) ([]by
return c, nil
}

func (ki *KubeadmInit) kubectlApply(fileName, namespace string, runner capeiplan.Runner) error {
func (ki *KubeadmInit) kubectlApply(ctx context.Context, fileName, namespace string, runner capeiplan.Runner) error {
content, err := ki.updateManifestNamespace(fileName, namespace)
if err != nil {
return errors.Wrap(err, "Failed to upate manifest namespace")
}
return RunKubectlApply(runner, KubectlApplyArgs{Content: content}, fileName)
return RunKubectlApply(ctx, runner, KubectlApplyArgs{Content: content}, fileName)
}

func (ki *KubeadmInit) manifestContent(fileName string) ([]byte, error) {
Expand All @@ -216,7 +217,7 @@ func (ki *KubeadmInit) manifestContent(fileName string) ([]byte, error) {
return content, nil
}

func (ki *KubeadmInit) applySecretWith(sshKey []byte, discoveryTokenCaCertHash, certKey, namespace string, runner capeiplan.Runner) error {
func (ki *KubeadmInit) applySecretWith(ctx context.Context, sshKey []byte, discoveryTokenCaCertHash, certKey, namespace string, runner capeiplan.Runner) error {
log.Info("adding SSH key to WKS secret and applying its manifest")
fileName := "03_secrets.yaml"
secret, err := ki.deserializeSecret(fileName, namespace)
Expand All @@ -234,7 +235,7 @@ func (ki *KubeadmInit) applySecretWith(sshKey []byte, discoveryTokenCaCertHash,
if err != nil {
return errors.Wrap(err, "failed to serialize manifest")
}
return RunKubectlApply(runner, KubectlApplyArgs{Content: bytes}, fileName)
return RunKubectlApply(ctx, runner, KubectlApplyArgs{Content: bytes}, fileName)
}

func (ki *KubeadmInit) deserializeSecret(fileName, namespace string) (*corev1.Secret, error) {
Expand All @@ -250,14 +251,14 @@ func (ki *KubeadmInit) deserializeSecret(fileName, namespace string) (*corev1.Se
}

// Undo implements plan.Resource.
func (ki *KubeadmInit) Undo(runner capeiplan.Runner, current capeiplan.State) error {
func (ki *KubeadmInit) Undo(ctx context.Context, runner capeiplan.Runner, current capeiplan.State) error {
remotePath := "/tmp/wks_kubeadm_init_config.yaml"
var ignored string
return buildKubeadmInitPlan(
remotePath,
strings.Join(ki.IgnorePreflightErrors, ","),
ki.UseIPTables, ki.KubernetesVersion, &ignored).Undo(
runner, capeiplan.EmptyState)
ctx, runner, capeiplan.EmptyState)
}

// buildKubeadmInitPlan builds a plan for kubeadm init command.
Expand Down

0 comments on commit be40f31

Please sign in to comment.