Skip to content

Commit

Permalink
chore(operator): refactor keptntaskcontroller to use builder interface (
Browse files Browse the repository at this point in the history
#1450)

Signed-off-by: realanna <anna.reale@dynatrace.com>
  • Loading branch information
RealAnna committed May 26, 2023
1 parent ecd8c48 commit a3f5e5b
Show file tree
Hide file tree
Showing 10 changed files with 595 additions and 91 deletions.
3 changes: 2 additions & 1 deletion operator/controllers/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var ErrRetryCountExceeded = fmt.Errorf("retryCount for evaluation exceeded")
var ErrNoValues = fmt.Errorf("no values")
var ErrInvalidOperator = fmt.Errorf("invalid operator")
var ErrCannotMarshalParams = fmt.Errorf("could not marshal parameters")
var ErrNoTaskDefinitionSpec = fmt.Errorf("the TaskDefinition specs are empty")
var ErrUnsupportedWorkloadInstanceResourceReference = fmt.Errorf("unsupported Resource Reference")
var ErrCannotGetKeptnTaskDefinition = fmt.Errorf("cannot retrieve KeptnTaskDefinition")

Expand All @@ -22,7 +23,7 @@ var ErrCannotFetchAppVersionMsg = "could not retrieve KeptnappVersion: %w"
var ErrCannotRetrieveWorkloadInstancesMsg = "could not retrieve KeptnWorkloadInstance: %w"
var ErrCannotRetrieveWorkloadMsg = "could not retrieve KeptnWorkload: %w"
var ErrNoLabelsFoundTask = "no labels found for task: %s"
var ErrNoConfigMapMsg = "No ConfigMap specified or HTTP source specified in TaskDefinition) / Namespace: %s, Name: %s"
var ErrNoConfigMapMsg = "no ConfigMap specified or HTTP source specified in TaskDefinition) / Namespace: %s, Name: %s"
var ErrCannotGetFunctionConfigMap = "could not get function configMap: %w"
var ErrCannotFetchAppVersionForWorkloadInstanceMsg = "could not fetch AppVersion for KeptnWorkloadInstance: "
var ErrCouldNotUnbindSpan = "could not unbind span for %s"
46 changes: 46 additions & 0 deletions operator/controllers/lifecycle/keptntask/container_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package keptntask

import (
"reflect"

"github.com/go-logr/logr"
klcv1alpha3 "github.com/keptn/lifecycle-toolkit/operator/apis/lifecycle/v1alpha3"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// IContainerBuilder is the interface that describes the operations needed to help build job specs of a task
type IContainerBuilder interface {
// CreateContainerWithVolumes returns a job container and volumes based on the task definition spec
CreateContainerWithVolumes(ctx context.Context) (*corev1.Container, []corev1.Volume, error)
}

// BuilderOptions contains everything needed to build the current job
type BuilderOptions struct {
client.Client
recorder record.EventRecorder
req ctrl.Request
Log logr.Logger
task *klcv1alpha3.KeptnTask
taskDef *klcv1alpha3.KeptnTaskDefinition
}

func getContainerBuilder(options BuilderOptions) IContainerBuilder {
if isJSSpecDefined(&options.taskDef.Spec) {
builder := newJSBuilder(options)
return &builder
}
return nil
}

func specExists(definition *klcv1alpha3.KeptnTaskDefinition) bool {
//TODO when adding new builders add more logic here
return isJSSpecDefined(&definition.Spec)
}

func isJSSpecDefined(spec *klcv1alpha3.KeptnTaskDefinitionSpec) bool {
return !reflect.DeepEqual(spec.Function, klcv1alpha3.FunctionSpec{})
}
91 changes: 52 additions & 39 deletions operator/controllers/lifecycle/keptntask/job_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package keptntask
import (
"context"
"fmt"
"reflect"
"math/rand"

"github.com/imdario/mergo"
klcv1alpha3 "github.com/keptn/lifecycle-toolkit/operator/apis/lifecycle/v1alpha3"
apicommon "github.com/keptn/lifecycle-toolkit/operator/apis/lifecycle/v1alpha3/common"
controllercommon "github.com/keptn/lifecycle-toolkit/operator/controllers/common"
controllererrors "github.com/keptn/lifecycle-toolkit/operator/controllers/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func (r *KeptnTaskReconciler) createJob(ctx context.Context, req ctrl.Request, task *klcv1alpha3.KeptnTask) error {
Expand All @@ -22,7 +25,7 @@ func (r *KeptnTaskReconciler) createJob(ctx context.Context, req ctrl.Request, t
return err
}

if !reflect.DeepEqual(definition.Spec.Function, klcv1alpha3.FunctionSpec{}) {
if specExists(definition) {
jobName, err = r.createFunctionJob(ctx, req, task, definition)
if err != nil {
return err
Expand All @@ -40,31 +43,8 @@ func (r *KeptnTaskReconciler) createJob(ctx context.Context, req ctrl.Request, t
}

func (r *KeptnTaskReconciler) createFunctionJob(ctx context.Context, req ctrl.Request, task *klcv1alpha3.KeptnTask, definition *klcv1alpha3.KeptnTaskDefinition) (string, error) {
params, hasParent, err := r.parseFunctionTaskDefinition(definition)
if err != nil {
return "", err
}
if hasParent {
if err := r.handleParent(ctx, req, task, definition, params); err != nil {
return "", err
}
}

params.Context = setupTaskContext(task)

if len(task.Spec.Parameters.Inline) > 0 {
err = mergo.Merge(&params.Parameters, task.Spec.Parameters.Inline)
if err != nil {
controllercommon.RecordEvent(r.Recorder, apicommon.PhaseCreateTask, "Warning", task, "TaskDefinitionMergeFailure", fmt.Sprintf("could not merge KeptnTaskDefinition: %s ", task.Spec.TaskDefinition), "")
return "", err
}
}

if task.Spec.SecureParameters.Secret != "" {
params.SecureParameters = task.Spec.SecureParameters.Secret
}

job, err := r.generateFunctionJob(task, params)
job, err := r.generateJob(ctx, task, definition, req)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -127,21 +107,54 @@ func setupTaskContext(task *klcv1alpha3.KeptnTask) klcv1alpha3.TaskContext {
return taskContext
}

func (r *KeptnTaskReconciler) handleParent(ctx context.Context, req ctrl.Request, task *klcv1alpha3.KeptnTask, definition *klcv1alpha3.KeptnTaskDefinition, params FunctionExecutionParams) error {
var parentJobParams FunctionExecutionParams
parentDefinition, err := controllercommon.GetTaskDefinition(r.Client, r.Log, ctx, definition.Spec.Function.FunctionReference.Name, req.Namespace)
func (r *KeptnTaskReconciler) generateJob(ctx context.Context, task *klcv1alpha3.KeptnTask, definition *klcv1alpha3.KeptnTaskDefinition, request ctrl.Request) (*batchv1.Job, error) {
randomId := rand.Intn(99999-10000) + 10000
jobId := fmt.Sprintf("klc-%s-%d", apicommon.TruncateString(task.Name, apicommon.MaxTaskNameLength), randomId)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobId,
Namespace: task.Namespace,
Labels: task.Labels,
Annotations: task.CreateKeptnAnnotations(),
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: task.Labels,
Annotations: task.Annotations,
},
Spec: corev1.PodSpec{
RestartPolicy: "OnFailure",
},
},
BackoffLimit: task.Spec.Retries,
ActiveDeadlineSeconds: task.GetActiveDeadlineSeconds(),
},
}
err := controllerutil.SetControllerReference(task, job, r.Scheme)
if err != nil {
controllercommon.RecordEvent(r.Recorder, apicommon.PhaseCreateTask, "Warning", task, "TaskDefinitionNotFound", fmt.Sprintf("could not find KeptnTaskDefinition: %s ", task.Spec.TaskDefinition), "")
return err
r.Log.Error(err, "could not set controller reference:")
}
parentJobParams, _, err = r.parseFunctionTaskDefinition(parentDefinition)
if err != nil {
return err

builderOpt := BuilderOptions{
Client: r.Client,
req: request,
Log: r.Log,
task: task,
taskDef: definition,
recorder: r.Recorder,
}
builder := getContainerBuilder(builderOpt)
if builder == nil {
return nil, controllererrors.ErrNoTaskDefinitionSpec
}
err = mergo.Merge(&params, parentJobParams)
container, volumes, err := builder.CreateContainerWithVolumes(ctx)

if err != nil {
controllercommon.RecordEvent(r.Recorder, apicommon.PhaseCreateTask, "Warning", task, "TaskDefinitionMergeFailure", fmt.Sprintf("could not merge KeptnTaskDefinition: %s ", task.Spec.TaskDefinition), "")
return err
return nil, controllererrors.ErrCannotMarshalParams
}
return nil

job.Spec.Template.Spec.Containers = []corev1.Container{*container}
job.Spec.Template.Spec.Volumes = volumes
return job, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@ package keptntask
import (
"encoding/json"
"fmt"
"math/rand"
"os"

"github.com/imdario/mergo"
klcv1alpha3 "github.com/keptn/lifecycle-toolkit/operator/apis/lifecycle/v1alpha3"
apicommon "github.com/keptn/lifecycle-toolkit/operator/apis/lifecycle/v1alpha3/common"
controllercommon "github.com/keptn/lifecycle-toolkit/operator/controllers/common"
controllererrors "github.com/keptn/lifecycle-toolkit/operator/controllers/errors"
batchv1 "k8s.io/api/batch/v1"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// JSBuilder implements container builder interface for javascript deno
type JSBuilder struct {
options BuilderOptions
}

func newJSBuilder(options BuilderOptions) JSBuilder {
return JSBuilder{
options: options,
}
}

// FunctionExecutionParams stores parametersrelatedto js deno container creation
type FunctionExecutionParams struct {
ConfigMap string
Parameters map[string]string
Expand All @@ -23,53 +34,29 @@ type FunctionExecutionParams struct {
Context klcv1alpha3.TaskContext
}

func (r *KeptnTaskReconciler) generateFunctionJob(task *klcv1alpha3.KeptnTask, params FunctionExecutionParams) (*batchv1.Job, error) {
randomId := rand.Intn(99999-10000) + 10000
jobId := fmt.Sprintf("klc-%s-%d", apicommon.TruncateString(task.Name, apicommon.MaxTaskNameLength), randomId)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobId,
Namespace: task.Namespace,
Labels: task.Labels,
Annotations: task.CreateKeptnAnnotations(),
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: task.Labels,
Annotations: task.Annotations,
},
Spec: corev1.PodSpec{
RestartPolicy: "OnFailure",
},
},
BackoffLimit: task.Spec.Retries,
ActiveDeadlineSeconds: task.GetActiveDeadlineSeconds(),
},
}
err := controllerutil.SetControllerReference(task, job, r.Scheme)
if err != nil {
r.Log.Error(err, "could not set controller reference:")
}

func (js *JSBuilder) CreateContainerWithVolumes(ctx context.Context) (*corev1.Container, []corev1.Volume, error) {
container := corev1.Container{
Name: "keptn-function-runner",
Image: os.Getenv("FUNCTION_RUNNER_IMAGE"),
}

var envVars []corev1.EnvVar

params, err := js.getParams(ctx)
if err != nil {
return nil, nil, err
}
if len(params.Parameters) > 0 {
jsonParams, err := json.Marshal(params.Parameters)
if err != nil {
return job, controllererrors.ErrCannotMarshalParams
return nil, nil, err
}
envVars = append(envVars, corev1.EnvVar{Name: "DATA", Value: string(jsonParams)})
}

jsonParams, err := json.Marshal(params.Context)
if err != nil {
return job, controllererrors.ErrCannotMarshalParams
return nil, nil, err
}
envVars = append(envVars, corev1.EnvVar{Name: "CONTEXT", Value: string(jsonParams)})

Expand All @@ -84,24 +71,23 @@ func (r *KeptnTaskReconciler) generateFunctionJob(task *klcv1alpha3.KeptnTask, p
},
})
}

var jobVolumes []corev1.Volume
// Mount the function code if a ConfigMap is provided
// The ConfigMap might be provided manually or created by the TaskDefinition controller
if params.ConfigMap != "" {
envVars = append(envVars, corev1.EnvVar{Name: "SCRIPT", Value: "/var/data/function.ts"})

job.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: "function-mount",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: params.ConfigMap,
},
jobVolumes = append(jobVolumes, corev1.Volume{
Name: "function-mount",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: params.ConfigMap,
},
},
},
}
})

container.VolumeMounts = []corev1.VolumeMount{
{
Name: "function-mount",
Expand All @@ -115,13 +101,38 @@ func (r *KeptnTaskReconciler) generateFunctionJob(task *klcv1alpha3.KeptnTask, p
}

container.Env = envVars
job.Spec.Template.Spec.Containers = []corev1.Container{
container,
return &container, jobVolumes, nil

}

func (js *JSBuilder) getParams(ctx context.Context) (*FunctionExecutionParams, error) {
params, hasParent, err := js.parseFunctionTaskDefinition(js.options.taskDef)
if err != nil {
return nil, err
}
if hasParent {
if err := js.handleParent(ctx, &params); err != nil {
return nil, err
}
}

params.Context = setupTaskContext(js.options.task)

if len(js.options.task.Spec.Parameters.Inline) > 0 {
err = mergo.Merge(&params.Parameters, js.options.task.Spec.Parameters.Inline)
if err != nil {
controllercommon.RecordEvent(js.options.recorder, apicommon.PhaseCreateTask, "Warning", js.options.task, "TaskDefinitionMergeFailure", fmt.Sprintf("could not merge KeptnTaskDefinition: %s ", js.options.task.Spec.TaskDefinition), "")
return nil, err
}
}
return job, nil

if js.options.task.Spec.SecureParameters.Secret != "" {
params.SecureParameters = js.options.task.Spec.SecureParameters.Secret
}
return &params, nil
}

func (r *KeptnTaskReconciler) parseFunctionTaskDefinition(definition *klcv1alpha3.KeptnTaskDefinition) (FunctionExecutionParams, bool, error) {
func (js *JSBuilder) parseFunctionTaskDefinition(definition *klcv1alpha3.KeptnTaskDefinition) (FunctionExecutionParams, bool, error) {
params := FunctionExecutionParams{}

// Firstly check if this task definition has a parent object
Expand All @@ -131,7 +142,7 @@ func (r *KeptnTaskReconciler) parseFunctionTaskDefinition(definition *klcv1alpha
}

if definition.Status.Function.ConfigMap != "" && definition.Spec.Function.HttpReference.Url != "" {
r.Log.Info(fmt.Sprintf("The JobDefinition contains a ConfigMap and a HTTP Reference, ConfigMap is used / Namespace: %s, Name: %s ", definition.Namespace, definition.Name))
js.options.Log.Info(fmt.Sprintf("The JobDefinition contains a ConfigMap and a HTTP Reference, ConfigMap is used / Namespace: %s, Name: %s ", definition.Namespace, definition.Name))
}

// Check if there is a ConfigMap with the function for this object
Expand All @@ -156,3 +167,22 @@ func (r *KeptnTaskReconciler) parseFunctionTaskDefinition(definition *klcv1alpha
}
return params, hasParent, nil
}

func (js *JSBuilder) handleParent(ctx context.Context, params *FunctionExecutionParams) error {
var parentJobParams FunctionExecutionParams
parentDefinition, err := controllercommon.GetTaskDefinition(js.options.Client, js.options.Log, ctx, js.options.taskDef.Spec.Function.FunctionReference.Name, js.options.req.Namespace)
if err != nil {
controllercommon.RecordEvent(js.options.recorder, apicommon.PhaseCreateTask, "Warning", js.options.task, "TaskDefinitionNotFound", fmt.Sprintf("could not find KeptnTaskDefinition: %s ", js.options.task.Spec.TaskDefinition), "")
return err
}
parentJobParams, _, err = js.parseFunctionTaskDefinition(parentDefinition)
if err != nil {
return err
}
err = mergo.Merge(params, parentJobParams)
if err != nil {
controllercommon.RecordEvent(js.options.recorder, apicommon.PhaseCreateTask, "Warning", js.options.task, "TaskDefinitionMergeFailure", fmt.Sprintf("could not merge KeptnTaskDefinition: %s ", js.options.task.Spec.TaskDefinition), "")
return err
}
return nil
}
Loading

0 comments on commit a3f5e5b

Please sign in to comment.