Skip to content

Commit

Permalink
Zipkin Receiver: default timestamp (open-telemetry#1068)
Browse files Browse the repository at this point in the history
Fixes open-telemetry#954

open-telemetry#954 is being caused from the conversion of OC -> Internal -> OC span types. 

OC technically allows nil times because its a pointer, but [docs state](https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/trace/v1/trace.proto#L125-L135)

```go
// This field is semantically required. When not set on receive -
// receiver should set it to the value of end_time field if it was
// set. Or to the current time if neither was set. It is important to
// keep end_time > start_time for consistency.
//
// This field is required.
StartTime *timestamp.Timestamp
```

So this pr changes the zipkin receiver to set the start/end timestamps if they're not set on receive
  • Loading branch information
chris-smith-zocdoc authored and wyTrivail committed Jul 13, 2020
1 parent 7266133 commit 884f775
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 33 deletions.
2 changes: 1 addition & 1 deletion exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.Trace

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), err
return len(td.Spans), consumererror.Permanent(err)
}

req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
Expand Down
1 change: 1 addition & 0 deletions receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.
}

node := nodeFromZipkinEndpoints(zs, pbs)
zipkin.SetTimestampsIfUnset(pbs)

return pbs, node
}
Expand Down
3 changes: 2 additions & 1 deletion receiver/zipkinreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ func TestSpanKindTranslation(t *testing.T) {
TraceID: zipkinmodel.TraceID{Low: 123},
ID: 456,
},
Kind: tt.zipkinKind,
Kind: tt.zipkinKind,
Timestamp: time.Now(),
}
ocSpan, _ := zipkinSpanToTraceSpan(zs)
assert.EqualValues(t, tt.ocKind, ocSpan.Kind)
Expand Down
1 change: 1 addition & 0 deletions translator/trace/zipkin/attributekeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ const (
RemoteEndpointIPv6 = "zipkin.remoteEndpoint.ipv6"
RemoteEndpointPort = "zipkin.remoteEndpoint.port"
RemoteEndpointServiceName = "zipkin.remoteEndpoint.serviceName"
StartTimeAbsent = "otel.zipkin.absentField.startTime"
)
7 changes: 7 additions & 0 deletions translator/trace/zipkin/protospan_to_zipkinv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package zipkin
import (
"net"
"strconv"
"time"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
Expand Down Expand Up @@ -243,7 +244,13 @@ func OCSpanProtoToZipkin(
if spanKindFromAttributes {
redundantKeys[tracetranslator.TagSpanKind] = true
}

startTime := internal.TimestampToTime(s.StartTime)
if _, ok := attrMap[StartTimeAbsent]; ok {
redundantKeys[StartTimeAbsent] = true
startTime = time.Time{}
}

z := &zipkinmodel.SpanModel{
SpanContext: zipkinmodel.SpanContext{
TraceID: convertTraceID(s.TraceId),
Expand Down
25 changes: 25 additions & 0 deletions translator/trace/zipkin/zipkinv1_to_protospan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"strconv"
"time"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pkg/errors"

"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

Expand Down Expand Up @@ -175,6 +177,7 @@ func zipkinV1ToOCSpan(zSpan *zipkinV1Span) (*tracepb.Span, *annotationParseResul
}

setSpanKind(ocSpan, parsedAnnotations.Kind, parsedAnnotations.ExtendedKind)
SetTimestampsIfUnset(ocSpan)

return ocSpan, parsedAnnotations, nil
}
Expand Down Expand Up @@ -481,3 +484,25 @@ func (ep *endpoint) createAttributeMap() map[string]string {
}
return attributeMap
}

func SetTimestampsIfUnset(span *tracepb.Span) {
// zipkin allows timestamp to be unset, but opentelemetry-collector expects it to have a value.
// If this is unset, the conversion from open census to the internal trace format breaks
// what should be an identity transformation oc -> internal -> oc
if span.StartTime == nil {
now := internal.TimeToTimestamp(time.Now())
span.StartTime = now
span.EndTime = now

if span.Attributes == nil {
span.Attributes = &tracepb.Span_Attributes{}
}
if span.Attributes.AttributeMap == nil {
span.Attributes.AttributeMap = make(map[string]*tracepb.AttributeValue, 1)
}
span.Attributes.AttributeMap[StartTimeAbsent] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_BoolValue{
BoolValue: true,
}}
}
}
114 changes: 83 additions & 31 deletions translator/trace/zipkin/zipkinv1_to_protospan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strconv"
"testing"
"time"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

Expand Down Expand Up @@ -247,14 +249,15 @@ func sortTraceByNodeName(trace []consumerdata.TraceData) {

func TestZipkinAnnotationsToOCStatus(t *testing.T) {
type test struct {
name string
haveTags []*binaryAnnotation
wantAttributes *tracepb.Span_Attributes
wantStatus *tracepb.Status
}

cases := []test{
// only status.code tag
{
name: "only status.code tag",
haveTags: []*binaryAnnotation{{
Key: "status.code",
Value: "13",
Expand All @@ -264,17 +267,19 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
Code: 13,
},
},
// only status.message tag

{
name: "only status.message tag",
haveTags: []*binaryAnnotation{{
Key: "status.message",
Value: "Forbidden",
}},
wantAttributes: nil,
wantStatus: nil,
},
// both status.code and status.message

{
name: "both status.code and status.message",
haveTags: []*binaryAnnotation{
{
Key: "status.code",
Expand All @@ -292,8 +297,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
},
},

// http status.code
{
name: "http status.code",
haveTags: []*binaryAnnotation{
{
Key: "http.status_code",
Expand Down Expand Up @@ -324,8 +329,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
},
},

// http and oc
{
name: "http and oc",
haveTags: []*binaryAnnotation{
{
Key: "http.status_code",
Expand Down Expand Up @@ -364,8 +369,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
},
},

// http and only oc code
{
name: "http and only oc code",
haveTags: []*binaryAnnotation{
{
Key: "http.status_code",
Expand Down Expand Up @@ -398,8 +403,9 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
Code: 14,
},
},
// http and only oc message

{
name: "http and only oc message",
haveTags: []*binaryAnnotation{
{
Key: "http.status_code",
Expand Down Expand Up @@ -434,8 +440,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
},
},

// census tags
{
name: "census tags",
haveTags: []*binaryAnnotation{
{
Key: "census.status_code",
Expand All @@ -453,8 +459,8 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
},
},

// census tags priority over others
{
name: "census tags priority over others",
haveTags: []*binaryAnnotation{
{
Key: "census.status_code",
Expand Down Expand Up @@ -506,33 +512,79 @@ func TestZipkinAnnotationsToOCStatus(t *testing.T) {
fakeSpanID := "0000000000000001"

for i, c := range cases {
zSpans := []*zipkinV1Span{{
ID: fakeSpanID,
TraceID: fakeTraceID,
BinaryAnnotations: c.haveTags,
}}
zBytes, err := json.Marshal(zSpans)
if err != nil {
t.Errorf("#%d: Unexpected error: %v", i, err)
continue
}
gb, err := V1JSONBatchToOCProto(zBytes)
if err != nil {
t.Errorf("#%d: Unexpected error: %v", i, err)
continue
}
gs := gb[0].Spans[0]
t.Run(c.name, func(t *testing.T) {
zSpans := []*zipkinV1Span{{
ID: fakeSpanID,
TraceID: fakeTraceID,
BinaryAnnotations: c.haveTags,
Timestamp: 1,
}}
zBytes, err := json.Marshal(zSpans)
if err != nil {
t.Errorf("#%d: Unexpected error: %v", i, err)
return
}
gb, err := V1JSONBatchToOCProto(zBytes)
if err != nil {
t.Errorf("#%d: Unexpected error: %v", i, err)
return
}
gs := gb[0].Spans[0]

if !reflect.DeepEqual(gs.Attributes, c.wantAttributes) {
t.Fatalf("Unsuccessful conversion\nGot:\n\t%v\nWant:\n\t%v", gs.Attributes, c.wantAttributes)
}
if !reflect.DeepEqual(gs.Attributes, c.wantAttributes) {
t.Fatalf("Unsuccessful conversion\nGot:\n\t%v\nWant:\n\t%v", gs.Attributes, c.wantAttributes)
}

if !reflect.DeepEqual(gs.Status, c.wantStatus) {
t.Fatalf("Unsuccessful conversion: %d\nGot:\n\t%v\nWant:\n\t%v", i, gs.Status, c.wantStatus)
}
if !reflect.DeepEqual(gs.Status, c.wantStatus) {
t.Fatalf("Unsuccessful conversion: %d\nGot:\n\t%v\nWant:\n\t%v", i, gs.Status, c.wantStatus)
}
})
}
}

func TestSpanWithoutTimestampGetsTag(t *testing.T) {
fakeTraceID := "00000000000000010000000000000002"
fakeSpanID := "0000000000000001"
zSpans := []*zipkinV1Span{
{
ID: fakeSpanID,
TraceID: fakeTraceID,
Timestamp: 0, // no timestamp field
},
}
zBytes, err := json.Marshal(zSpans)
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}

testStart := time.Now()

gb, err := V1JSONBatchToOCProto(zBytes)
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}

gs := gb[0].Spans[0]
assert.NotNil(t, gs.StartTime)
assert.NotNil(t, gs.EndTime)

assert.True(t, internal.TimestampToTime(gs.StartTime).Sub(testStart) >= 0)

wantAttributes := &tracepb.Span_Attributes{
AttributeMap: map[string]*tracepb.AttributeValue{
StartTimeAbsent: {
Value: &tracepb.AttributeValue_BoolValue{
BoolValue: true,
},
},
},
}

assert.EqualValues(t, gs.Attributes, wantAttributes)
}

func TestJSONHTTPToGRPCStatusCode(t *testing.T) {
fakeTraceID := "00000000000000010000000000000002"
fakeSpanID := "0000000000000001"
Expand Down

0 comments on commit 884f775

Please sign in to comment.