Skip to content

Commit

Permalink
feat(scheduler): Add tracing support (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
thisthat committed Oct 7, 2022
1 parent a195614 commit 60651d1
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 11 deletions.
37 changes: 35 additions & 2 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,54 @@ limitations under the License.
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/keptn-sandbox/lifecycle-controller/scheduler/pkg/klcpermit"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/component-base/cli"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"os"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

func main() {

tp := initOTel()

rand.Seed(time.Now().UnixNano())
command := app.NewSchedulerCommand(
app.WithPlugin(klcpermit.Name, klcpermit.New),
)

code := cli.Run(command)

err := tp.Shutdown(context.TODO())
if err != nil {
fmt.Println(err)
}
os.Exit(code)

}

func initOTel() *sdktrace.TracerProvider {
var err error
exp, err := stdout.NewExporter(stdout.WithPrettyPrint())
if err != nil {
log.Panicf("failed to initialize stdout exporter %v\n", err)
}
bsp := sdktrace.NewSimpleSpanProcessor(exp)
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSpanProcessor(bsp),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp
}
7 changes: 4 additions & 3 deletions scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module github.com/keptn-sandbox/lifecycle-controller/scheduler
go 1.18

require (
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/stdout v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
k8s.io/api v0.24.3
k8s.io/apimachinery v0.24.3
k8s.io/client-go v0.24.3
Expand Down Expand Up @@ -67,13 +71,10 @@ require (
go.opentelemetry.io/contrib v0.20.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0 // indirect
go.opentelemetry.io/otel v0.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
go.opentelemetry.io/otel/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk v0.20.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions scheduler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel/exporters/otlp v0.20.0 h1:PTNgq9MRmQqqJY0REVbZFvwkYOA85vbdQU/nVfxDyqg=
go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
go.opentelemetry.io/otel/exporters/stdout v0.20.0 h1:NXKkOWV7Np9myYrQE0wqRS3SbwzbupHu07rDONKubMo=
go.opentelemetry.io/otel/exporters/stdout v0.20.0/go.mod h1:t9LUU3JvYlmoPA61abhvsXxKh58xdyi3nMtI6JiR8v0=
go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw=
Expand Down
44 changes: 38 additions & 6 deletions scheduler/pkg/klcpermit/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package klcpermit

import (
"context"

"github.com/keptn-sandbox/lifecycle-controller/scheduler/pkg/tracing"
"go.opentelemetry.io/otel/codes"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var workloadInstanceResource = schema.GroupVersionResource{Group: "lifecycle.keptn.sh", Version: "v1alpha1", Resource: "keptnworkloadinstances"}
Expand Down Expand Up @@ -39,41 +43,53 @@ type Manager interface {

type WorkloadManager struct {
dynamicClient dynamic.Interface
Tracer trace.Tracer
}

func NewWorkloadManager(d dynamic.Interface) *WorkloadManager {
sMgr := &WorkloadManager{
dynamicClient: d,
Tracer: otel.Tracer("keptn/scheduler"),
}
return sMgr
}

var bindCRDSpan = make(map[string]trace.Span, 100)

func (sMgr *WorkloadManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
//List workloadInstance run CRDs
name := GetCRDName(pod)
name := getCRDName(pod)
crd, err := sMgr.GetCRD(ctx, pod.Namespace, name)

if err != nil {
klog.Infof("[Keptn Permit Plugin] could not find workloadInstance crd %s, err:%s", name, err.Error())
return WorkloadInstanceNotFound
}

ctx, span := sMgr.getSpan(ctx, crd, pod)

//check CRD status
phase, found, err := unstructured.NestedString(crd.UnstructuredContent(), "status", "preDeploymentStatus")
klog.Infof("[Keptn Permit Plugin] workloadInstance crd %s, found %s with phase %s ", crd, found, phase)
if err == nil && found {
span.AddEvent("StatusEvaluation", trace.WithAttributes(tracing.Status.String(phase)))
switch KeptnState(phase) {
case StatePending:
return Wait
case StateFailed:
span.SetStatus(codes.Error, "Failed")
span.End()
unbindSpan(pod)
return Failure
case StateSucceeded:
span.End()
unbindSpan(pod)
return Success
case StatePending:
return Wait
case StateRunning:
return Wait
case StateUnknown:
return Wait
}

}
return WorkloadInstanceStatusNotSpecified
}
Expand All @@ -84,9 +100,25 @@ func (sMgr *WorkloadManager) GetCRD(ctx context.Context, namespace string, name
return sMgr.dynamicClient.Resource(workloadInstanceResource).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
}

func GetCRDName(pod *corev1.Pod) string {
func (sMgr *WorkloadManager) getSpan(ctx context.Context, crd *unstructured.Unstructured, pod *corev1.Pod) (context.Context, trace.Span) {
name := getCRDName(pod)
if span, ok := bindCRDSpan[name]; ok {
return ctx, span
}
ctx, span := tracing.CreateSpan(ctx, crd, sMgr.Tracer, pod.Namespace)
//TODO store only sampled one and cap it
bindCRDSpan[name] = span
return ctx, span
}

func getCRDName(pod *corev1.Pod) string {
application := pod.Annotations["keptn.sh/app"]
workloadInstance := pod.Annotations["keptn.sh/workload"]
version := pod.Annotations["keptn.sh/version"]
return application + "-" + workloadInstance + "-" + version
}

func unbindSpan(pod *corev1.Pod) {
name := getCRDName(pod)
delete(bindCRDSpan, name)
}
25 changes: 25 additions & 0 deletions scheduler/pkg/tracing/carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package tracing

import "fmt"

// KeptnCarrier carries the TraceContext
type KeptnCarrier map[string]interface{}

// Get returns the value associated with the passed key.
func (kc KeptnCarrier) Get(key string) string {
return fmt.Sprintf("%v", kc[key])
}

// Set stores the key-value pair.
func (kc KeptnCarrier) Set(key string, value string) {
kc[key] = value
}

// Keys lists the keys stored in this carrier.
func (kc KeptnCarrier) Keys() []string {
keys := make([]string, 0, len(kc))
for k := range kc {
keys = append(keys, k)
}
return keys
}
41 changes: 41 additions & 0 deletions scheduler/pkg/tracing/semconv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tracing

import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

const (
ApplicationName attribute.Key = attribute.Key("keptn.deployment.app_name")
Workload attribute.Key = attribute.Key("keptn.deployment.workload")
Version attribute.Key = attribute.Key("keptn.deployment.version")
Namespace attribute.Key = attribute.Key("keptn.deployment.namespace")
Status attribute.Key = attribute.Key("keptn.deployment.status")
)

func CreateSpan(ctx context.Context, crd *unstructured.Unstructured, t trace.Tracer, ns string) (context.Context, trace.Span) {
// search for annotations
annotations, found, _ := unstructured.NestedMap(crd.UnstructuredContent(), "metadata", "annotations")
if found {
ctx = otel.GetTextMapPropagator().Extract(ctx, KeptnCarrier(annotations))
}
ctx, span := t.Start(ctx, "schedule")

appName, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "app")
if found {
span.SetAttributes(ApplicationName.String(appName))
}
workload, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "workloadName")
if found {
span.SetAttributes(Workload.String(workload))
}
version, found, _ := unstructured.NestedString(crd.UnstructuredContent(), "spec", "version")
if found {
span.SetAttributes(Version.String(version))
}
span.SetAttributes(Namespace.String(ns))
return ctx, span
}

0 comments on commit 60651d1

Please sign in to comment.