Skip to content

Commit

Permalink
feat: add cloud events support (#1843)
Browse files Browse the repository at this point in the history
Signed-off-by: Giovanni Liva <giovanni.liva@dynatrace.com>
  • Loading branch information
thisthat committed Aug 9, 2023
1 parent f25b24d commit 5b47120
Show file tree
Hide file tree
Showing 30 changed files with 542 additions and 69 deletions.
1 change: 1 addition & 0 deletions docs/content/en/docs/crd-ref/options/v1alpha1/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ _Appears in:_
| --- | --- |
| `OTelCollectorUrl` _string_ | OTelCollectorUrl can be used to set the Open Telemetry collector that the lifecycle operator should use |
| `keptnAppCreationRequestTimeoutSeconds` _integer_ | KeptnAppCreationRequestTimeoutSeconds is used to set the interval in which automatic app discovery searches for workload to put into the same auto-generated KeptnApp |
| `cloudEventsEndpoint` _string_ | CloudEventsEndpoint can be used to set the endpoint where Cloud Events should be posted by the lifecycle operator |


4 changes: 4 additions & 0 deletions helm/chart/templates/keptnconfig-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ spec:
description: OTelCollectorUrl can be used to set the Open Telemetry
collector that the lifecycle operator should use
type: string
cloudEventsEndpoint:
description: CloudEventsEndpoint can be used to set the endpoint where
Cloud Events should be posted by the lifecycle operator
type: string
keptnAppCreationRequestTimeoutSeconds:
default: 30
description: KeptnAppCreationRequestTimeoutSeconds is used to set the
Expand Down
5 changes: 5 additions & 0 deletions lifecycle-operator/apis/options/v1alpha1/keptnconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ type KeptnConfigSpec struct {
// OTelCollectorUrl can be used to set the Open Telemetry collector that the lifecycle operator should use
// +optional
OTelCollectorUrl string `json:"OTelCollectorUrl,omitempty"`

// KeptnAppCreationRequestTimeoutSeconds is used to set the interval in which automatic app discovery
// searches for workload to put into the same auto-generated KeptnApp
// +kubebuilder:default:=30
// +optional
KeptnAppCreationRequestTimeoutSeconds uint `json:"keptnAppCreationRequestTimeoutSeconds,omitempty"`

// CloudEventsEndpoint can be used to set the endpoint where Cloud Events should be posted by the lifecycle operator
// +optional
CloudEventsEndpoint string `json:"cloudEventsEndpoint,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
description: OTelCollectorUrl can be used to set the Open Telemetry
collector that the lifecycle operator should use
type: string
cloudEventsEndpoint:
description: CloudEventsEndpoint can be used to set the endpoint where
Cloud Events should be posted by the lifecycle operator
type: string
keptnAppCreationRequestTimeoutSeconds:
default: 30
description: KeptnAppCreationRequestTimeoutSeconds is used to set
Expand Down
11 changes: 11 additions & 0 deletions lifecycle-operator/controllers/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ const defaultKeptnAppCreationRequestTimeout = 30 * time.Second
type IConfig interface {
SetCreationRequestTimeout(value time.Duration)
GetCreationRequestTimeout() time.Duration
SetCloudEventsEndpoint(endpoint string)
GetCloudEventsEndpoint() string
}

type ControllerConfig struct {
keptnAppCreationRequestTimeout time.Duration
cloudEventsEndpoint string
}

var instance *ControllerConfig
Expand All @@ -34,3 +37,11 @@ func (o *ControllerConfig) SetCreationRequestTimeout(value time.Duration) {
func (o *ControllerConfig) GetCreationRequestTimeout() time.Duration {
return o.keptnAppCreationRequestTimeout
}

func (o *ControllerConfig) SetCloudEventsEndpoint(endpoint string) {
o.cloudEventsEndpoint = endpoint
}

func (o *ControllerConfig) GetCloudEventsEndpoint() string {
return o.cloudEventsEndpoint
}
81 changes: 81 additions & 0 deletions lifecycle-operator/controllers/common/config/fake/config_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestEvaluationHandler(t *testing.T) {
handler := EvaluationHandler{
SpanHandler: &spanHandlerMock,
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(fakeRecorder),
EventSender: NewK8sSender(fakeRecorder),
Client: fake.NewClientBuilder().WithObjects(&tt.evalObj).Build(),
Tracer: trace.NewNoopTracerProvider().Tracer("tracer"),
Scheme: scheme.Scheme,
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestEvaluationHandler_createEvaluation(t *testing.T) {
handler := EvaluationHandler{
SpanHandler: &kltfake.ISpanHandlerMock{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
EventSender: NewK8sSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().Build(),
Tracer: trace.NewNoopTracerProvider().Tracer("tracer"),
Scheme: scheme.Scheme,
Expand Down
91 changes: 84 additions & 7 deletions lifecycle-operator/controllers/common/eventsender.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package common

import (
"context"
"fmt"
"strings"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/go-logr/logr"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/config"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -15,28 +20,100 @@ type IEvent interface {

// ===== Main =====

func NewEventSender(recorder record.EventRecorder) IEvent {
return newK8sSender(recorder)
type EventMultiplexer struct {
logger logr.Logger
emitters []IEvent
}

func NewEventMultiplexer(logger logr.Logger, recorder record.EventRecorder, client ce.Client) *EventMultiplexer {
multiplexer := &EventMultiplexer{
logger: logger,
}
multiplexer.register(newCloudEventSender(logger, client))
multiplexer.register(NewK8sSender(recorder))
return multiplexer
}

func (e *EventMultiplexer) register(emitter IEvent) {
if emitter != nil {
e.emitters = append(e.emitters, emitter)
}
}

func (e *EventMultiplexer) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
for _, emitter := range e.emitters {
e.logger.Info(fmt.Sprintf("Emitting event using %T", emitter))
go emitter.Emit(phase, eventType, reconcileObject, status, message, version)
}
}

// ===== Cloud Event Sender =====
// TODO: implement Cloud Event logic

type cloudEvent struct {
client ce.Client
logger logr.Logger
}

func newCloudEventSender(logger logr.Logger, client ce.Client) *cloudEvent {
return &cloudEvent{
client: client,
logger: logger,
}
}

// Emit creates a Cloud Event and send it to the endpoint
func (e *cloudEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
endpoint := config.Instance().GetCloudEventsEndpoint()
if endpoint == "" {
// if no endpoint is configured we don't emit any event
if !strings.HasPrefix(endpoint, "http") {
e.logger.V(5).Info(fmt.Sprintf("CloudEvent endpoint configured but it does not start with http: %s", endpoint))
}
return
}
event := ce.NewEvent()
event.SetSource("keptn.sh")
event.SetType(fmt.Sprintf("%s.%s", phase.LongName, status))

msg := setEventMessage(phase, reconcileObject, message, version)
err := event.SetData(ce.ApplicationJSON, map[string]interface{}{
"message": msg,
"type": eventType,
"version": version,
"resource": map[string]string{
"group": reconcileObject.GetObjectKind().GroupVersionKind().Group,
"kind": reconcileObject.GetObjectKind().GroupVersionKind().Kind,
"version": reconcileObject.GetObjectKind().GroupVersionKind().Version,
"name": reconcileObject.GetName(),
"namespace": reconcileObject.GetNamespace(),
},
})
if err != nil {
e.logger.V(5).Info(fmt.Sprintf("Failed to set data for CloudEvent: %v", err))
return
}

ctx := ce.ContextWithTarget(context.TODO(), endpoint)
if result := e.client.Send(ctx, event); ce.IsUndelivered(result) {
e.logger.V(5).Info(fmt.Sprintf("Failed to send CloudEvent: %v", event))
}
}

// ===== K8s Event Sender =====

type k8sEvent struct {
recorder record.EventRecorder
}

func newK8sSender(recorder record.EventRecorder) IEvent {
func NewK8sSender(recorder record.EventRecorder) IEvent {
return &k8sEvent{
recorder: recorder,
}
}

// SendEvent creates k8s Event and adds it to Eventqueue
func (s *k8sEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
// Emit creates k8s Event and adds it to Eventqueue
func (e *k8sEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
msg := setEventMessage(phase, reconcileObject, message, version)
annotations := setAnnotations(reconcileObject, phase)
s.recorder.AnnotatedEventf(reconcileObject, annotations, eventType, fmt.Sprintf("%s%s", phase.ShortName, status), msg)
e.recorder.AnnotatedEventf(reconcileObject, annotations, eventType, fmt.Sprintf("%s%s", phase.ShortName, status), msg)
}
Loading

0 comments on commit 5b47120

Please sign in to comment.