From 98d321f1c10776a1b5a11886478ed10ba5caea29 Mon Sep 17 00:00:00 2001 From: Louis Taylor Date: Tue, 24 Jul 2018 18:02:26 +0100 Subject: [PATCH] Converge machines --- pkg/wing/convergemachine.go | 85 ++++++++++++++++++++++++++++++++++ pkg/wing/instancecontroller.go | 4 -- pkg/wing/puppet.go | 77 ------------------------------ pkg/wing/wing.go | 3 -- 4 files changed, 85 insertions(+), 84 deletions(-) create mode 100644 pkg/wing/convergemachine.go diff --git a/pkg/wing/convergemachine.go b/pkg/wing/convergemachine.go new file mode 100644 index 0000000000..9275725010 --- /dev/null +++ b/pkg/wing/convergemachine.go @@ -0,0 +1,85 @@ +package wing + +import ( + "fmt" + + "github.com/jetstack/tarmak/pkg/apis/wing/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (w *Wing) convergeMachine() error { + machineAPI := w.clientset.WingV1alpha1().Machines(w.flags.ClusterName) + machine, err := machineAPI.Get( + w.flags.InstanceName, + metav1.GetOptions{}, + ) + if err != nil { + if kerr, ok := err.(*apierrors.StatusError); ok && kerr.ErrStatus.Reason == metav1.StatusReasonNotFound { + machine = &v1alpha1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: w.flags.InstanceName, + }, + Status: v1alpha1.MachineStatus{ + Converged: false, + }, + } + _, err := machineAPI.Create(machine) + if err != nil { + return fmt.Errorf("error creating machine: %s", err) + } + return nil + } + return fmt.Errorf("error get existing machine: %s", err) + } + + if machine.Status.Converged { + w.log.Infof("Machine already converged: %s", machine.Name) + return nil + } + + puppetTarget := machine.Spec.PuppetTargetRef + if puppetTarget == "" { + w.log.Warn("no puppet target for machine: ", machine.Name) + return nil + } + + // FIXME: this shouldn't be done on the wing agent + jobName := fmt.Sprintf("%s-%s", w.flags.InstanceName, puppetTarget) + jobsAPI := w.clientset.WingV1alpha1().WingJobs(w.flags.ClusterName) + job, err := jobsAPI.Get( + jobName, + metav1.GetOptions{}, + ) + if err != nil { + if kerr, ok := err.(*apierrors.StatusError); ok && kerr.ErrStatus.Reason == metav1.StatusReasonNotFound { + job = &v1alpha1.WingJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + }, + Spec: &v1alpha1.WingJobSpec{ + InstanceName: machine.Name, + PuppetTargetRef: puppetTarget, + Operation: "apply", + RequestTimestamp: metav1.Now(), + }, + Status: &v1alpha1.WingJobStatus{}, + } + _, err := jobsAPI.Create(job) + if err != nil { + return fmt.Errorf("error creating WingJob: %s", err) + } + return nil + } + return fmt.Errorf("error get existing WingJob: %s", err) + } + + machineCopy := machine.DeepCopy() + machineCopy.Status.Converged = true + _, err = machineAPI.Update(machineCopy) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/wing/instancecontroller.go b/pkg/wing/instancecontroller.go index 066181ecb6..ebf1e9f9b6 100644 --- a/pkg/wing/instancecontroller.go +++ b/pkg/wing/instancecontroller.go @@ -51,10 +51,6 @@ func (c *MachineController) processNextItem() bool { } func (c *MachineController) syncMachine(key string) error { - - // ensure only one converge at a time - c.wing.convergeWG.Wait() - obj, exists, err := c.indexer.GetByKey(key) if err != nil { c.log.Errorf("Fetching object with key %s from store failed with %v", key, err) diff --git a/pkg/wing/puppet.go b/pkg/wing/puppet.go index 0b0d824f09..68e26209ff 100644 --- a/pkg/wing/puppet.go +++ b/pkg/wing/puppet.go @@ -19,7 +19,6 @@ import ( "github.com/cenkalti/backoff" "github.com/docker/docker/pkg/archive" "golang.org/x/net/context" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/jetstack/tarmak/pkg/apis/wing/v1alpha1" @@ -143,82 +142,6 @@ func (w *Wing) runPuppet(job *v1alpha1.WingJob) error { return nil } -func (w *Wing) convergeMachine() error { - machineAPI := w.clientset.WingV1alpha1().Machines(w.flags.ClusterName) - machine, err := machineAPI.Get( - w.flags.InstanceName, - metav1.GetOptions{}, - ) - if err != nil { - if kerr, ok := err.(*apierrors.StatusError); ok && kerr.ErrStatus.Reason == metav1.StatusReasonNotFound { - machine = &v1alpha1.Machine{ - ObjectMeta: metav1.ObjectMeta{ - Name: w.flags.InstanceName, - }, - Status: v1alpha1.MachineStatus{ - Converged: false, - }, - } - _, err := machineAPI.Create(machine) - if err != nil { - return fmt.Errorf("error creating machine: %s", err) - } - return nil - } - return fmt.Errorf("error get existing machine: %s", err) - } - - if machine.Status.Converged { - w.log.Infof("Machine already converged: ", machine.Name) - return nil - } - - puppetTarget := machine.Spec.PuppetTargetRef - if puppetTarget == "" { - w.log.Warn("no puppet target for machine: ", machine.Name) - return nil - } - - // FIXME: this shouldn't be done on the wing agent - jobName := fmt.Sprintf("%s-%s", w.flags.InstanceName, puppetTarget) - jobsAPI := w.clientset.WingV1alpha1().WingJobs(w.flags.ClusterName) - job, err := jobsAPI.Get( - jobName, - metav1.GetOptions{}, - ) - if err != nil { - if kerr, ok := err.(*apierrors.StatusError); ok && kerr.ErrStatus.Reason == metav1.StatusReasonNotFound { - job = &v1alpha1.WingJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: jobName, - }, - Spec: &v1alpha1.WingJobSpec{ - InstanceName: machine.Name, - PuppetTargetRef: puppetTarget, - Operation: "apply", - RequestTimestamp: metav1.Now(), - }, - Status: &v1alpha1.WingJobStatus{}, - } - _, err := jobsAPI.Create(job) - if err != nil { - return fmt.Errorf("error creating WingJob: %s", err) - } - return nil - } - return fmt.Errorf("error get existing WingJob: %s", err) - } - - machineCopy := machine.DeepCopy() - machineCopy.Status.Converged = true - _, err = machineAPI.Update(machineCopy) - if err != nil { - return err - } - - return nil -} - func (w *Wing) converge(job *v1alpha1.WingJob) { w.convergeWG.Add(1) defer w.convergeWG.Done() diff --git a/pkg/wing/wing.go b/pkg/wing/wing.go index a53c19de4a..a3705af0cb 100644 --- a/pkg/wing/wing.go +++ b/pkg/wing/wing.go @@ -88,9 +88,6 @@ func (w *Wing) Run(args []string) error { signal.Notify(signalCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) w.signalHandler(signalCh) - // run converge on instance after first start - go w.convergeMachine() - // start watching for API server events that trigger applies w.watchForNotifications()