Skip to content

Commit

Permalink
Extract part of logic from PlanExecutionController into function (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
alenkacz committed Jul 23, 2019
1 parent 6e9d71c commit 22d619c
Showing 1 changed file with 151 additions and 117 deletions.
268 changes: 151 additions & 117 deletions pkg/controller/planexecution/planexecution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"log"
"strconv"
"strings"

"github.com/kudobuilder/kudo/pkg/util/kudo"

Expand Down Expand Up @@ -272,45 +273,6 @@ func (r *ReconcilePlanExecution) Reconcile(request reconcile.Request) (reconcile
return reconcile.Result{}, err
}

// Load parameters:

// Create config map to hold all parameters for instantiation
configs := make(map[string]interface{})

// Default parameters from instance metadata
configs["OperatorName"] = operatorVersion.Spec.Operator.Name
configs["Name"] = instance.Name
configs["Namespace"] = instance.Namespace

params := make(map[string]interface{})
for k, v := range instance.Spec.Parameters {
params[k] = v
}

// Merge defaults with customizations
for _, param := range operatorVersion.Spec.Parameters {
_, ok := params[param.Name]
if !ok {
// Not specified in params
if param.Required && param.Default == nil {
err = fmt.Errorf("parameter %v was required but not provided by instance %v", param.Name, instance.Name)
log.Printf("PlanExecutionController: %v", err)
r.recorder.Event(planExecution, "Warning", "MissingParameter", fmt.Sprintf("Could not find required parameter (%v)", param.Name))
return reconcile.Result{}, err
}
params[param.Name] = kudo.StringValue(param.Default)
}
}

configs["Params"] = params

// Get Plan from OperatorVersion.
//
// Right now must match exactly. In the future have defaults/backups: e.g., if no
// "upgrade", call "update"; if no "update", call "deploy"
//
// When we have this we'll have to keep the active plan in the status since that might
// not match the "requested" plan.
executedPlan, ok := operatorVersion.Spec.Plans[planExecution.Spec.PlanName]
if !ok {
r.recorder.Event(planExecution, "Warning", "InvalidPlan", fmt.Sprintf("Could not find required plan (%v)", planExecution.Spec.PlanName))
Expand All @@ -319,87 +281,17 @@ func (r *ReconcilePlanExecution) Reconcile(request reconcile.Request) (reconcile
return reconcile.Result{}, err
}

planExecution.Status.Name = planExecution.Spec.PlanName
planExecution.Status.Strategy = executedPlan.Strategy

planExecution.Status.Phases = make([]kudov1alpha1.PhaseStatus, len(executedPlan.Phases))
for i, phase := range executedPlan.Phases {
// Populate the Status elements in instance.
planExecution.Status.Phases[i].Name = phase.Name
planExecution.Status.Phases[i].Strategy = phase.Strategy
planExecution.Status.Phases[i].State = kudov1alpha1.PhaseStatePending
planExecution.Status.Phases[i].Steps = make([]kudov1alpha1.StepStatus, len(phase.Steps))
for j, step := range phase.Steps {
// Fetch OperatorVersion:
//
// - Get the task name from the step
// - Get the task definition from the OV
// - Create the kustomize templates
// - Apply
configs["PlanName"] = planExecution.Spec.PlanName
configs["PhaseName"] = phase.Name
configs["StepName"] = step.Name
configs["StepNumber"] = strconv.FormatInt(int64(j), 10)

var objs []runtime.Object

engine := kudoengine.New()
for _, t := range step.Tasks {
// resolve task
if taskSpec, ok := operatorVersion.Spec.Tasks[t]; ok {
resources := make(map[string]string)

for _, res := range taskSpec.Resources {
if resource, ok := operatorVersion.Spec.Templates[res]; ok {
templatedYaml, err := engine.Render(resource, configs)
if err != nil {
r.recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error expanding template: %v", err))
log.Printf("PlanExecutionController: Error expanding template: %v", err)
planExecution.Status.State = kudov1alpha1.PhaseStateError
planExecution.Status.Phases[i].State = kudov1alpha1.PhaseStateError
// returning error = nil so that we don't retry since this is non-recoverable
return reconcile.Result{}, nil
}
resources[res] = templatedYaml

} else {
r.recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error finding resource named %v for operator version %v", res, operatorVersion.Name))
log.Printf("PlanExecutionController: Error finding resource named %v for operator version %v", res, operatorVersion.Name)
return reconcile.Result{}, err
}
}

objsToAdd, err := applyConventionsToTemplates(resources, metadata{
InstanceName: instance.Name,
Namespace: instance.Namespace,
OperatorName: operatorVersion.Spec.Operator.Name,
OperatorVersion: operatorVersion.Spec.Version,
PlanExecution: planExecution.Name,
PlanName: planExecution.Spec.PlanName,
PhaseName: phase.Name,
StepName: step.Name,
})

if err != nil {
r.recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error creating Kubernetes objects from step %v in phase %v of plan %v: %v", step.Name, phase.Name, planExecution.Name, err))
log.Printf("PlanExecutionController: Error creating Kubernetes objects from step %v in phase %v of plan %v: %v", step.Name, phase.Name, planExecution.Name, err)
return reconcile.Result{}, err
}
objs = append(objs, objsToAdd...)
} else {
r.recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error finding task named %s for operator version %s", taskSpec, operatorVersion.Name))
log.Printf("PlanExecutionController: Error finding task named %s for operator version %s", taskSpec, operatorVersion.Name)
return reconcile.Result{}, err
}
}

planExecution.Status.Phases[i].Steps[j].Name = step.Name
planExecution.Status.Phases[i].Steps[j].Objects = objs
planExecution.Status.Phases[i].Steps[j].Delete = step.Delete
log.Printf("PlanExecutionController: Phase \"%v\" Step \"%v\" of instance '%v' has %v object(s)", phase.Name, step.Name, instance.Name, len(objs))
err = populatePlanExecution(executedPlan, planExecution, instance, operatorVersion, r.recorder)
if err != nil {
_, fatalError := err.(*fatalError)
if fatalError {
// do not retry
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

// now we're actually starting with the execution of plan/phase/step
for i, phase := range planExecution.Status.Phases {
// If we still want to execute phases in this plan check if phase is healthy
for j, s := range phase.Steps {
Expand Down Expand Up @@ -542,6 +434,148 @@ func (r *ReconcilePlanExecution) Reconcile(request reconcile.Request) (reconcile
return reconcile.Result{}, nil
}

// fatalError is representing type of error that is non-recoverable (like bug in the template preventing rendering)
// we should not retry these errors
type fatalError struct {
err error
}

func (e fatalError) Error() string {
return fmt.Sprintf("Fatal error: %v", e.err)
}

// populatePlanExecution reads content of the Plan defined in operator version and populates PlanExecution with data from rendered templates
func populatePlanExecution(activePlan kudov1alpha1.Plan, planExecution *kudov1alpha1.PlanExecution, instance *kudov1alpha1.Instance, operatorVersion *kudov1alpha1.OperatorVersion, recorder record.EventRecorder) error {
// Load parameters:

// Create config map to hold all parameters for instantiation
configs := make(map[string]interface{})

// Default parameters from instance metadata
configs["OperatorName"] = operatorVersion.Spec.Operator.Name
configs["Name"] = instance.Name
configs["Namespace"] = instance.Namespace

params, err := getParameters(instance, operatorVersion)
if err != nil {
log.Printf("PlanExecutionController: %v", err)
recorder.Event(planExecution, "Warning", "MissingParameter", err.Error())
return err
}

configs["Params"] = params

planExecution.Status.Name = planExecution.Spec.PlanName
planExecution.Status.Strategy = activePlan.Strategy

planExecution.Status.Phases = make([]kudov1alpha1.PhaseStatus, len(activePlan.Phases))
for i, phase := range activePlan.Phases {
// Populate the Status elements in instance.
planExecution.Status.Phases[i].Name = phase.Name
planExecution.Status.Phases[i].Strategy = phase.Strategy
planExecution.Status.Phases[i].State = kudov1alpha1.PhaseStatePending
planExecution.Status.Phases[i].Steps = make([]kudov1alpha1.StepStatus, len(phase.Steps))
for j, step := range phase.Steps {
// Fetch OperatorVersion:
//
// - Get the task name from the step
// - Get the task definition from the OV
// - Create the kustomize templates
// - Apply
configs["PlanName"] = planExecution.Spec.PlanName
configs["PhaseName"] = phase.Name
configs["StepName"] = step.Name
configs["StepNumber"] = strconv.FormatInt(int64(j), 10)

var objs []runtime.Object

engine := kudoengine.New()
for _, t := range step.Tasks {
// resolve task
if taskSpec, ok := operatorVersion.Spec.Tasks[t]; ok {
resources := make(map[string]string)

for _, res := range taskSpec.Resources {
if resource, ok := operatorVersion.Spec.Templates[res]; ok {
templatedYaml, err := engine.Render(resource, configs)
if err != nil {
recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error expanding template: %v", err))
log.Printf("PlanExecutionController: Error expanding template: %v", err)
planExecution.Status.State = kudov1alpha1.PhaseStateError
planExecution.Status.Phases[i].State = kudov1alpha1.PhaseStateError
// returning error = nil so that we don't retry since this is non-recoverable
return fatalError{err: err}
}
resources[res] = templatedYaml

} else {
recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error finding resource named %v for operator version %v", res, operatorVersion.Name))
log.Printf("PlanExecutionController: Error finding resource named %v for operator version %v", res, operatorVersion.Name)
return fatalError{err: fmt.Errorf("PlanExecutionController: Error finding resource named %v for operator version %v", res, operatorVersion.Name)}
}
}

objsToAdd, err := applyConventionsToTemplates(resources, metadata{
InstanceName: instance.Name,
Namespace: instance.Namespace,
OperatorName: operatorVersion.Spec.Operator.Name,
OperatorVersion: operatorVersion.Spec.Version,
PlanExecution: planExecution.Name,
PlanName: planExecution.Spec.PlanName,
PhaseName: phase.Name,
StepName: step.Name,
})

if err != nil {
recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error creating Kubernetes objects from step %v in phase %v of plan %v: %v", step.Name, phase.Name, planExecution.Name, err))
log.Printf("PlanExecutionController: Error creating Kubernetes objects from step %v in phase %v of plan %v: %v", step.Name, phase.Name, planExecution.Name, err)
return err
}
objs = append(objs, objsToAdd...)
} else {
recorder.Event(planExecution, "Warning", "InvalidPlanExecution", fmt.Sprintf("Error finding task named %s for operator version %s", taskSpec, operatorVersion.Name))
log.Printf("PlanExecutionController: Error finding task named %s for operator version %s", taskSpec, operatorVersion.Name)
return fatalError{err: err}
}
}

planExecution.Status.Phases[i].Steps[j].Name = step.Name
planExecution.Status.Phases[i].Steps[j].Objects = objs
planExecution.Status.Phases[i].Steps[j].Delete = step.Delete
log.Printf("PlanExecutionController: Phase \"%v\" Step \"%v\" of instance '%v' has %v object(s)", phase.Name, step.Name, instance.Name, len(objs))
}
}

return nil
}

func getParameters(instance *kudov1alpha1.Instance, operatorVersion *kudov1alpha1.OperatorVersion) (map[string]string, error) {
params := make(map[string]string)

for k, v := range instance.Spec.Parameters {
params[k] = v
}

missingRequiredParameters := make([]string, 0)
// Merge defaults with customizations
for _, param := range operatorVersion.Spec.Parameters {
_, ok := params[param.Name]
if !ok && param.Required && param.Default == nil {
// instance does not define this parameter and there is no default while the parameter is required -> error
missingRequiredParameters = append(missingRequiredParameters, param.Name)

} else if !ok {
params[param.Name] = kudo.StringValue(param.Default)
}
}

if len(missingRequiredParameters) != 0 {
return nil, fmt.Errorf("parameters are missing when evaluating template: %s", strings.Join(missingRequiredParameters, ","))
}

return params, nil
}

// Cleanup modifies objects on the cluster to allow for the provided obj to get CreateOrApply.
// Currently only needs to clean up Jobs that get run from multiplePlanExecutions
func (r *ReconcilePlanExecution) Cleanup(obj runtime.Object) error {
Expand Down

0 comments on commit 22d619c

Please sign in to comment.