Skip to content

Commit

Permalink
feat(server): add support for span status (#3029)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Aug 4, 2023
1 parent 3821f71 commit f2b1e4c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 42 deletions.
27 changes: 20 additions & 7 deletions server/model/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
)

const (
TracetestMetadataFieldStartTime string = "tracetest.span.start_time"
TracetestMetadataFieldEndTime string = "tracetest.span.end_time"
TracetestMetadataFieldDuration string = "tracetest.span.duration"
TracetestMetadataFieldType string = "tracetest.span.type"
TracetestMetadataFieldName string = "tracetest.span.name"
TracetestMetadataFieldParentID string = "tracetest.span.parent_id"
TracetestMetadataFieldKind string = "tracetest.span.kind"
TracetestMetadataFieldStartTime string = "tracetest.span.start_time"
TracetestMetadataFieldEndTime string = "tracetest.span.end_time"
TracetestMetadataFieldDuration string = "tracetest.span.duration"
TracetestMetadataFieldType string = "tracetest.span.type"
TracetestMetadataFieldName string = "tracetest.span.name"
TracetestMetadataFieldParentID string = "tracetest.span.parent_id"
TracetestMetadataFieldKind string = "tracetest.span.kind"
TracetestMetadataFieldStatusCode string = "tracetest.span.status_code"
TracetestMetadataFieldStatusDescription string = "tracetest.span.status_description"
)

type Attributes map[string]string
Expand Down Expand Up @@ -74,11 +76,17 @@ type Span struct {
Attributes Attributes
Kind SpanKind
Events []SpanEvent
Status *SpanStatus

Parent *Span `json:"-"`
Children []*Span `json:"-"`
}

type SpanStatus struct {
Code string
Description string
}

func (s *Span) injectEventsIntoAttributes() {
if s.Events == nil {
s.Events = make([]SpanEvent, 0)
Expand Down Expand Up @@ -217,6 +225,11 @@ func (span Span) setMetadataAttributes() Span {
span.Attributes[TracetestMetadataFieldStartTime] = fmt.Sprintf("%d", span.StartTime.UnixNano())
span.Attributes[TracetestMetadataFieldEndTime] = fmt.Sprintf("%d", span.EndTime.UnixNano())

if span.Status != nil {
span.Attributes[TracetestMetadataFieldStatusCode] = span.Status.Code
span.Attributes[TracetestMetadataFieldStatusDescription] = span.Status.Description
}

return span
}

Expand Down
82 changes: 47 additions & 35 deletions server/resourcemanager/resource_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package resourcemanager

import (
"context"
"database/sql"
"encoding/json"
"errors"
Expand All @@ -11,6 +12,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -232,26 +234,28 @@ func (m *manager[T]) instrumentRoute(route *mux.Route) {

route.Handler(newHandler)
}

func (m *manager[T]) methodNotAllowed(w http.ResponseWriter, r *http.Request) {
writeError(w, EncoderFromRequest(r), http.StatusMethodNotAllowed, fmt.Errorf("resource %s does not support the action", m.resourceTypeSingular))
writeError(r.Context(), w, EncoderFromRequest(r), http.StatusMethodNotAllowed, fmt.Errorf("resource %s does not support the action", m.resourceTypeSingular))
}

func (m *manager[T]) create(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
encoder := EncoderFromRequest(r)

targetResource := Resource[T]{}
err := encoder.DecodeRequestBody(&targetResource)
if err != nil {
writeError(w, encoder, http.StatusBadRequest, fmt.Errorf("cannot parse body: %w", err))
writeError(ctx, w, encoder, http.StatusBadRequest, fmt.Errorf("cannot parse body: %w", err))
return
}

// TODO: if resourceType != values.resourceType return error

m.doCreate(w, r, encoder, targetResource.Spec)
m.doCreate(ctx, w, r, encoder, targetResource.Spec)
}

func (m *manager[T]) doCreate(w http.ResponseWriter, r *http.Request, encoder Encoder, specs T) {
func (m *manager[T]) doCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, encoder Encoder, specs T) {
if !specs.HasID() {
specs = m.rh.SetID(specs, m.config.idgen())
}
Expand All @@ -262,12 +266,12 @@ func (m *manager[T]) doCreate(w http.ResponseWriter, r *http.Request, encoder En
specs.GetID(),
err.Error(),
)
writeError(w, encoder, http.StatusBadRequest, err)
writeError(ctx, w, encoder, http.StatusBadRequest, err)
}

created, err := m.rh.Create(r.Context(), specs)
created, err := m.rh.Create(ctx, specs)
if err != nil {
m.handleResourceHandlerError(w, "creating", err, encoder)
m.handleResourceHandlerError(ctx, w, "creating", err, encoder)
return
}

Expand All @@ -278,50 +282,52 @@ func (m *manager[T]) doCreate(w http.ResponseWriter, r *http.Request, encoder En

err = encoder.WriteEncodedResponse(w, http.StatusCreated, newResource)
if err != nil {
writeError(w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
writeError(ctx, w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
}
}

func (m *manager[T]) upsert(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
encoder := EncoderFromRequest(r)

targetResource := Resource[T]{}
err := encoder.DecodeRequestBody(&targetResource)
if err != nil {
writeError(w, encoder, http.StatusBadRequest, fmt.Errorf("cannot parse body: %w", err))
writeError(ctx, w, encoder, http.StatusBadRequest, fmt.Errorf("cannot parse body: %w", err))
return
}

// if there's no ID given, create the resource
if !targetResource.Spec.HasID() {
m.doCreate(w, r, encoder, targetResource.Spec)
m.doCreate(ctx, w, r, encoder, targetResource.Spec)
return
}

_, err = m.rh.Get(r.Context(), targetResource.Spec.GetID())
_, err = m.rh.Get(ctx, targetResource.Spec.GetID())
if err != nil {
// if the given ID is not found, create the resource
if errors.Is(err, sql.ErrNoRows) {
m.doCreate(w, r, encoder, targetResource.Spec)
m.doCreate(ctx, w, r, encoder, targetResource.Spec)
return
} else {
// some actual error, return it
writeError(w, encoder, http.StatusInternalServerError, fmt.Errorf("could not get entity: %w", err))
writeError(ctx, w, encoder, http.StatusInternalServerError, fmt.Errorf("could not get entity: %w", err))
return
}
}

// the resurce exists, update it
m.doUpdate(w, r, encoder, targetResource.Spec)
m.doUpdate(ctx, w, r, encoder, targetResource.Spec)
}

func (m *manager[T]) update(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
encoder := EncoderFromRequest(r)

targetResource := Resource[T]{}
err := encoder.DecodeRequestBody(&targetResource)
if err != nil {
writeError(w, encoder, http.StatusBadRequest, fmt.Errorf("cannot parse body: %w", err))
writeError(ctx, w, encoder, http.StatusBadRequest, fmt.Errorf("cannot parse body: %w", err))
return
}

Expand All @@ -335,27 +341,27 @@ func (m *manager[T]) update(w http.ResponseWriter, r *http.Request) {
targetResource.Spec.GetID(),
urlID,
)
writeError(w, encoder, http.StatusBadRequest, err)
writeError(ctx, w, encoder, http.StatusBadRequest, err)
return
}
targetResource.Spec = m.rh.SetID(targetResource.Spec, urlID)

m.doUpdate(w, r, encoder, targetResource.Spec)
m.doUpdate(ctx, w, r, encoder, targetResource.Spec)
}

func (m *manager[T]) doUpdate(w http.ResponseWriter, r *http.Request, encoder Encoder, specs T) {
func (m *manager[T]) doUpdate(ctx context.Context, w http.ResponseWriter, r *http.Request, encoder Encoder, specs T) {
if err := specs.Validate(); err != nil {
err := fmt.Errorf(
"an error occurred while validating the resource: %s. error: %s",
specs.GetID(),
err.Error(),
)
writeError(w, encoder, http.StatusBadRequest, err)
writeError(ctx, w, encoder, http.StatusBadRequest, err)
}

updated, err := m.rh.Update(r.Context(), specs)
updated, err := m.rh.Update(ctx, specs)
if err != nil {
m.handleResourceHandlerError(w, "updating", err, encoder)
m.handleResourceHandlerError(ctx, w, "updating", err, encoder)
return
}

Expand All @@ -366,7 +372,7 @@ func (m *manager[T]) doUpdate(w http.ResponseWriter, r *http.Request, encoder En

err = encoder.WriteEncodedResponse(w, http.StatusOK, newResource)
if err != nil {
writeError(w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
writeError(ctx, w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
}
}

Expand Down Expand Up @@ -422,13 +428,13 @@ func (m *manager[T]) list(w http.ResponseWriter, r *http.Request) {
query, sortBy,
sortDirection, err := paginationParams(r, m.rh.SortingFields())
if err != nil {
writeError(w, encoder, http.StatusBadRequest, fmt.Errorf("cannot process request: %s", err.Error()))
writeError(ctx, w, encoder, http.StatusBadRequest, fmt.Errorf("cannot process request: %s", err.Error()))
return
}

count, err := m.rh.Count(ctx, query)
if err != nil {
m.handleResourceHandlerError(w, "listing", err, encoder)
m.handleResourceHandlerError(ctx, w, "listing", err, encoder)
return
}

Expand All @@ -446,7 +452,7 @@ func (m *manager[T]) list(w http.ResponseWriter, r *http.Request) {
sortDirection,
)
if err != nil {
m.handleResourceHandlerError(w, "listing", err, encoder)
m.handleResourceHandlerError(ctx, w, "listing", err, encoder)
return
}

Expand All @@ -470,7 +476,7 @@ func (m *manager[T]) list(w http.ResponseWriter, r *http.Request) {
err = encoder.WriteEncodedResponse(w, http.StatusOK, resourceList)

if err != nil {
writeError(w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
writeError(ctx, w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
}
}

Expand All @@ -481,6 +487,7 @@ func isRequestForAugmented(r *http.Request) bool {
}

func (m *manager[T]) get(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
encoder := EncoderFromRequest(r)

vars := mux.Vars(r)
Expand All @@ -491,9 +498,9 @@ func (m *manager[T]) get(w http.ResponseWriter, r *http.Request) {
getterFn = m.rh.GetAugmented
}

item, err := getterFn(r.Context(), id)
item, err := getterFn(ctx, id)
if err != nil {
m.handleResourceHandlerError(w, "getting", err, encoder)
m.handleResourceHandlerError(ctx, w, "getting", err, encoder)
return
}

Expand All @@ -504,42 +511,47 @@ func (m *manager[T]) get(w http.ResponseWriter, r *http.Request) {

err = encoder.WriteEncodedResponse(w, http.StatusOK, newResource)
if err != nil {
writeError(w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
writeError(ctx, w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
}
}

func (m *manager[T]) delete(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
encoder := EncoderFromRequest(r)

vars := mux.Vars(r)
id := id.ID(vars["id"])

err := m.rh.Delete(r.Context(), id)
err := m.rh.Delete(ctx, id)
if err != nil {
m.handleResourceHandlerError(w, "deleting", err, encoder)
m.handleResourceHandlerError(ctx, w, "deleting", err, encoder)
return
}

encoder.WriteEncodedResponse(w, http.StatusNoContent, nil)
}

func (m *manager[T]) handleResourceHandlerError(w http.ResponseWriter, verb string, err error, encoder Encoder) {
func (m *manager[T]) handleResourceHandlerError(ctx context.Context, w http.ResponseWriter, verb string, err error, encoder Encoder) {
// 404 - not found
if errors.Is(err, sql.ErrNoRows) {
encoder.WriteEncodedResponse(w, http.StatusNotFound, nil)
return
}

if errors.Is(err, validation.ErrValidation) {
writeError(w, encoder, http.StatusBadRequest, err)
writeError(ctx, w, encoder, http.StatusBadRequest, err)
}

// 500 - internal server error
err = fmt.Errorf("error %s resource %s: %w", verb, m.resourceTypeSingular, err)
writeError(w, encoder, http.StatusInternalServerError, err)
writeError(ctx, w, encoder, http.StatusInternalServerError, err)
}

func writeError(w http.ResponseWriter, enc Encoder, code int, err error) {
func writeError(ctx context.Context, w http.ResponseWriter, enc Encoder, code int, err error) {
span := trace.SpanFromContext(ctx)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

err = enc.WriteEncodedResponse(w, code, map[string]any{
"code": code,
"error": err.Error(),
Expand Down
9 changes: 9 additions & 0 deletions server/traces/otel_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func ConvertOtelSpanIntoSpan(span *v1.Span) *model.Span {
endTime = time.Unix(0, int64(span.GetEndTimeUnixNano()))
}

var spanStatus *model.SpanStatus
if span.Status != nil {
spanStatus = &model.SpanStatus{
Code: span.Status.Code.String(),
Description: span.Status.Message,
}
}

spanID := createSpanID(span.SpanId)
attributes[model.TracetestMetadataFieldParentID] = createSpanID(span.ParentSpanId).String()
return &model.Span{
Expand All @@ -61,6 +69,7 @@ func ConvertOtelSpanIntoSpan(span *v1.Span) *model.Span {
EndTime: endTime,
Parent: nil,
Events: extractEvents(span),
Status: spanStatus,
Children: make([]*model.Span, 0),
Attributes: attributes,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ spec:
- selector: span[name="POST /api/tests" tracetest.span.type="http"]
assertions:
- attr:tracetest.selected_spans.count = 1
- attr:tracetest.span.status_code = "STATUS_CODE_ERROR"
- attr:tracetest.span.status_description contains "test with same ID already exists"

0 comments on commit f2b1e4c

Please sign in to comment.