Skip to content

Commit

Permalink
Plumb rest config through from manager. (#1334)
Browse files Browse the repository at this point in the history
Plumb the config through alongside the client object (since they convey
the same information really) all the way to pipe task.

This way we use the same explicit config everywhere, rather than rely on
config.GetConfig to guess the correct one. This is especially important
when running the controller outside of a cluster.
  • Loading branch information
porridge committed Feb 5, 2020
1 parent 1c0f33c commit e2bc07e
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 13 deletions.
1 change: 1 addition & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func main() {
log.Print("Setting up instance controller")
err = (&instance.Reconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/instance/instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/rest"

"github.com/thoas/go-funk"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -59,6 +60,7 @@ const (
// Reconciler reconciles an Instance object.
type Reconciler struct {
client.Client
Config *rest.Config
Recorder record.EventRecorder
Scheme *runtime.Scheme
}
Expand Down Expand Up @@ -234,7 +236,7 @@ func (r *Reconciler) Reconcile(request ctrl.Request) (ctrl.Result, error) {
return reconcile.Result{}, err
}
log.Printf("InstanceController: Going to proceed in execution of active plan %s on instance %s/%s", activePlan.Name, instance.Namespace, instance.Name)
newStatus, err := workflow.Execute(activePlan, metadata, r.Client, &renderer.DefaultEnhancer{Scheme: r.Scheme}, time.Now())
newStatus, err := workflow.Execute(activePlan, metadata, r.Client, r.Config, &renderer.DefaultEnhancer{Scheme: r.Scheme}, time.Now())

// ---------- 5. Update status of instance after the execution proceeded ----------
if newStatus != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/instance/instance_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func startTestManager(t *testing.T) (chan struct{}, *sync.WaitGroup, client.Clie
assert.Nil(t, err, "Error when creating manager")
err = (&Reconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expand Down
2 changes: 2 additions & 0 deletions pkg/engine/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"regexp"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
Expand All @@ -15,6 +16,7 @@ import (
// Context is a engine.task execution context containing k8s client, templates parameters etc.
type Context struct {
Client client.Client
Config *rest.Config
Enhancer renderer.Enhancer
Meta renderer.Metadata
Templates map[string]string // Raw templates
Expand Down
13 changes: 3 additions & 10 deletions pkg/engine/task/task_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/yaml"

"github.com/kudobuilder/kudo/pkg/engine/renderer"
Expand Down Expand Up @@ -266,11 +265,6 @@ func pipePod(pod *corev1.Pod, name string) (string, error) {
}

func copyFiles(fs afero.Fs, ff []PipeFile, pod *corev1.Pod, ctx Context) error {
restCfg, err := config.GetConfig()
if err != nil {
return fatalExecutionError(fmt.Errorf("failed to fetch cluster REST config: %v", err), pipeTaskError, ctx.Meta)
}

var g errgroup.Group

for _, f := range ff {
Expand All @@ -280,7 +274,7 @@ func copyFiles(fs afero.Fs, ff []PipeFile, pod *corev1.Pod, ctx Context) error {
// Check the size of the pipe file first. K87 has a inherent limit on the size of
// Secret/ConfigMap, so we avoid unnecessary copying of files that are too big by
// checking its size first.
size, err := podexec.FileSize(f.File, pod, pipePodContainerName, restCfg)
size, err := podexec.FileSize(f.File, pod, pipePodContainerName, ctx.Config)
if err != nil {
// Any remote command exit code > 0 is treated as a fatal error since retrying it doesn't make sense
if podexec.HasCommandFailed(err) {
Expand All @@ -293,7 +287,7 @@ func copyFiles(fs afero.Fs, ff []PipeFile, pod *corev1.Pod, ctx Context) error {
return fatalExecutionError(fmt.Errorf("pipe file %s size %d exceeds maximum file size of %d bytes", f.File, size, maxPipeFileSize), pipeTaskError, ctx.Meta)
}

if err = podexec.DownloadFile(fs, f.File, pod, pipePodContainerName, restCfg); err != nil {
if err = podexec.DownloadFile(fs, f.File, pod, pipePodContainerName, ctx.Config); err != nil {
// Any remote command exit code > 0 is treated as a fatal error since retrying it doesn't make sense
if podexec.HasCommandFailed(err) {
return fatalExecutionError(err, pipeTaskError, ctx.Meta)
Expand All @@ -304,8 +298,7 @@ func copyFiles(fs afero.Fs, ff []PipeFile, pod *corev1.Pod, ctx Context) error {
})
}

err = g.Wait()
return err
return g.Wait()
}

// createArtifacts iterates through passed pipe files and their copied data, reads them, constructs k8s artifacts
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/workflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (ap *ActivePlan) taskByName(name string) (*v1beta1.Task, bool) {
//
// Furthermore, a transient ERROR during a step execution, means that the next step may be executed if the step strategy
// is "parallel". In case of a fatal error, it is returned alongside with the new plan status and published on the event bus.
func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, enh renderer.Enhancer, currentTime time.Time) (*v1beta1.PlanStatus, error) {
func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, config *rest.Config, enh renderer.Enhancer, currentTime time.Time) (*v1beta1.PlanStatus, error) {
if pl.Status.IsTerminal() {
log.Printf("PlanExecution: %s/%s plan %s is terminal, nothing to do", em.InstanceNamespace, em.InstanceName, pl.Name)
return pl.PlanStatus, nil
Expand Down Expand Up @@ -164,6 +165,7 @@ func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, enh renderer.
// - 3.c build task context -
ctx := task.Context{
Client: c,
Config: config,
Enhancer: enh,
Meta: exm,
Templates: pl.Templates,
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/workflow/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func TestExecutePlan(t *testing.T) {

for _, tt := range tests {
testClient := fake.NewFakeClientWithScheme(scheme.Scheme)
newStatus, err := Execute(tt.activePlan, tt.metadata, testClient, tt.enhancer, timeNow)
newStatus, err := Execute(tt.activePlan, tt.metadata, testClient, nil, tt.enhancer, timeNow)

if !tt.wantErr && err != nil {
t.Errorf("%s: Expecting no error but got one: %v", tt.name, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/test/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (h *Harness) RunKUDO() error {
h.logger.Log("Setting up instance controller")
err = (&instance.Reconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expand Down

0 comments on commit e2bc07e

Please sign in to comment.