/
zipkinv1_to_protospan.go
508 lines (433 loc) · 16.2 KB
/
zipkinv1_to_protospan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zipkin
import (
"encoding/json"
"fmt"
"math"
"strconv"
"time"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)
var (
// ZipkinV1 friendly conversion errors
msgZipkinV1JSONUnmarshalError = "zipkinv1"
msgZipkinV1TraceIDError = "zipkinV1 span traceId"
msgZipkinV1SpanIDError = "zipkinV1 span id"
msgZipkinV1ParentIDError = "zipkinV1 span parentId"
// Generic hex to ID conversion errors
errHexTraceIDWrongLen = errors.New("hex traceId span has wrong length (expected 16 or 32)")
errHexTraceIDParsing = errors.New("failed to parse hex traceId")
errHexTraceIDZero = errors.New("traceId is zero")
errHexIDWrongLen = errors.New("hex Id has wrong length (expected 16)")
errHexIDParsing = errors.New("failed to parse hex Id")
errHexIDZero = errors.New("Id is zero")
)
// Trace translation from Zipkin V1 is a bit of special case since there is no model
// defined in golang for Zipkin V1 spans and there is no need to define one here, given
// that the zipkinV1Span defined below is as defined at:
// https://zipkin.io/zipkin-api/zipkin-api.yaml
type zipkinV1Span struct {
TraceID string `json:"traceId"`
Name string `json:"name,omitempty"`
ParentID string `json:"parentId,omitempty"`
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Duration int64 `json:"duration"`
Debug bool `json:"debug,omitempty"`
Annotations []*annotation `json:"annotations,omitempty"`
BinaryAnnotations []*binaryAnnotation `json:"binaryAnnotations,omitempty"`
}
// endpoint structure used by zipkinV1Span.
type endpoint struct {
ServiceName string `json:"serviceName"`
IPv4 string `json:"ipv4"`
IPv6 string `json:"ipv6"`
Port int32 `json:"port"`
}
// annotation struct used by zipkinV1Span.
type annotation struct {
Timestamp int64 `json:"timestamp"`
Value string `json:"value"`
Endpoint *endpoint `json:"endpoint"`
}
// binaryAnnotation used by zipkinV1Span.
type binaryAnnotation struct {
Key string `json:"key"`
Value string `json:"value"`
Endpoint *endpoint `json:"endpoint"`
}
// V1JSONBatchToOCProto converts a JSON blob with a list of Zipkin v1 spans to OC Proto.
func V1JSONBatchToOCProto(blob []byte) ([]consumerdata.TraceData, error) {
var zSpans []*zipkinV1Span
if err := json.Unmarshal(blob, &zSpans); err != nil {
return nil, errors.WithMessage(err, msgZipkinV1JSONUnmarshalError)
}
ocSpansAndParsedAnnotations := make([]ocSpanAndParsedAnnotations, 0, len(zSpans))
for _, zSpan := range zSpans {
ocSpan, parsedAnnotations, err := zipkinV1ToOCSpan(zSpan)
if err != nil {
// error from internal package function, it already wraps the error to give better context.
return nil, err
}
ocSpansAndParsedAnnotations = append(ocSpansAndParsedAnnotations, ocSpanAndParsedAnnotations{
ocSpan: ocSpan,
parsedAnnotations: parsedAnnotations,
})
}
return zipkinToOCProtoBatch(ocSpansAndParsedAnnotations)
}
type ocSpanAndParsedAnnotations struct {
ocSpan *tracepb.Span
parsedAnnotations *annotationParseResult
}
func zipkinToOCProtoBatch(ocSpansAndParsedAnnotations []ocSpanAndParsedAnnotations) ([]consumerdata.TraceData, error) {
// Service to batch maps the service name to the trace request with the corresponding node.
svcToTD := make(map[string]*consumerdata.TraceData)
for _, curr := range ocSpansAndParsedAnnotations {
req := getOrCreateNodeRequest(svcToTD, curr.parsedAnnotations.Endpoint)
req.Spans = append(req.Spans, curr.ocSpan)
}
tds := make([]consumerdata.TraceData, 0, len(svcToTD))
for _, v := range svcToTD {
tds = append(tds, *v)
}
return tds, nil
}
func zipkinV1ToOCSpan(zSpan *zipkinV1Span) (*tracepb.Span, *annotationParseResult, error) {
traceID, err := hexTraceIDToOCTraceID(zSpan.TraceID)
if err != nil {
return nil, nil, errors.WithMessage(err, msgZipkinV1TraceIDError)
}
spanID, err := hexIDToOCID(zSpan.ID)
if err != nil {
return nil, nil, errors.WithMessage(err, msgZipkinV1SpanIDError)
}
var parentID []byte
if zSpan.ParentID != "" {
id, err := hexIDToOCID(zSpan.ParentID)
if err != nil {
return nil, nil, errors.WithMessage(err, msgZipkinV1ParentIDError)
}
parentID = id
}
parsedAnnotations := parseZipkinV1Annotations(zSpan.Annotations)
attributes, ocStatus, localComponent := zipkinV1BinAnnotationsToOCAttributes(zSpan.BinaryAnnotations)
if parsedAnnotations.Endpoint.ServiceName == unknownServiceName && localComponent != "" {
parsedAnnotations.Endpoint.ServiceName = localComponent
}
var startTime, endTime *timestamp.Timestamp
if zSpan.Timestamp == 0 {
startTime = parsedAnnotations.EarlyAnnotationTime
endTime = parsedAnnotations.LateAnnotationTime
} else {
startTime = epochMicrosecondsToTimestamp(zSpan.Timestamp)
endTime = epochMicrosecondsToTimestamp(zSpan.Timestamp + zSpan.Duration)
}
ocSpan := &tracepb.Span{
TraceId: traceID,
SpanId: spanID,
ParentSpanId: parentID,
Status: ocStatus,
Kind: parsedAnnotations.Kind,
TimeEvents: parsedAnnotations.TimeEvents,
StartTime: startTime,
EndTime: endTime,
Attributes: attributes,
}
if zSpan.Name != "" {
ocSpan.Name = &tracepb.TruncatableString{Value: zSpan.Name}
}
setSpanKind(ocSpan, parsedAnnotations.Kind, parsedAnnotations.ExtendedKind)
SetTimestampsIfUnset(ocSpan)
return ocSpan, parsedAnnotations, nil
}
func setSpanKind(ocSpan *tracepb.Span, kind tracepb.Span_SpanKind, extendedKind tracetranslator.OpenTracingSpanKind) {
if kind == tracepb.Span_SPAN_KIND_UNSPECIFIED &&
extendedKind != tracetranslator.OpenTracingSpanKindUnspecified {
// Span kind has no equivalent in OC, so we cannot represent it in the Kind field.
// We will set a TagSpanKind attribute in the span. This will successfully transfer
// in the pipeline until it reaches the exporter which is responsible for
// reverse translation.
if ocSpan.Attributes == nil {
ocSpan.Attributes = &tracepb.Span_Attributes{}
}
if ocSpan.Attributes.AttributeMap == nil {
ocSpan.Attributes.AttributeMap = make(map[string]*tracepb.AttributeValue, 1)
}
ocSpan.Attributes.AttributeMap[tracetranslator.TagSpanKind] =
&tracepb.AttributeValue{Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: string(extendedKind)},
}}
}
}
func zipkinV1BinAnnotationsToOCAttributes(binAnnotations []*binaryAnnotation) (attributes *tracepb.Span_Attributes, status *tracepb.Status, fallbackServiceName string) {
if len(binAnnotations) == 0 {
return nil, nil, ""
}
sMapper := &statusMapper{}
var localComponent string
attributeMap := make(map[string]*tracepb.AttributeValue)
for _, binAnnotation := range binAnnotations {
if binAnnotation.Endpoint != nil && binAnnotation.Endpoint.ServiceName != "" {
fallbackServiceName = binAnnotation.Endpoint.ServiceName
}
pbAttrib := &tracepb.AttributeValue{}
if iValue, err := strconv.ParseInt(binAnnotation.Value, 10, 64); err == nil {
pbAttrib.Value = &tracepb.AttributeValue_IntValue{IntValue: iValue}
} else if bValue, err := strconv.ParseBool(binAnnotation.Value); err == nil {
pbAttrib.Value = &tracepb.AttributeValue_BoolValue{BoolValue: bValue}
} else {
// For now all else go to string
pbAttrib.Value = &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: binAnnotation.Value}}
}
key := binAnnotation.Key
if key == zipkincore.LOCAL_COMPONENT {
// TODO: (@pjanotti) add reference to OpenTracing and change related tags to use them
key = "component"
localComponent = binAnnotation.Value
}
if drop := sMapper.fromAttribute(key, pbAttrib); drop {
continue
}
attributeMap[key] = pbAttrib
}
status = sMapper.ocStatus()
if len(attributeMap) == 0 {
return nil, status, ""
}
if fallbackServiceName == "" {
fallbackServiceName = localComponent
}
attributes = &tracepb.Span_Attributes{
AttributeMap: attributeMap,
}
return attributes, status, fallbackServiceName
}
// annotationParseResult stores the results of examining the original annotations,
// this way multiple passes on the annotations are not needed.
type annotationParseResult struct {
Endpoint *endpoint
TimeEvents *tracepb.Span_TimeEvents
Kind tracepb.Span_SpanKind
ExtendedKind tracetranslator.OpenTracingSpanKind
EarlyAnnotationTime *timestamp.Timestamp
LateAnnotationTime *timestamp.Timestamp
}
// Unknown service name works both as a default value and a flag to indicate that a valid endpoint was found.
const unknownServiceName = "unknown-service"
func parseZipkinV1Annotations(annotations []*annotation) *annotationParseResult {
// Zipkin V1 annotations have a timestamp so they fit well with OC TimeEvent
earlyAnnotationTimestamp := int64(math.MaxInt64)
lateAnnotationTimestamp := int64(math.MinInt64)
res := &annotationParseResult{}
timeEvents := make([]*tracepb.Span_TimeEvent, 0, len(annotations))
// We want to set the span kind from the first annotation that contains information
// about the span kind. This flags ensures we only set span kind once from
// the first annotation.
spanKindIsSet := false
for _, currAnnotation := range annotations {
if currAnnotation == nil || currAnnotation.Value == "" {
continue
}
endpointName := unknownServiceName
if currAnnotation.Endpoint != nil && currAnnotation.Endpoint.ServiceName != "" {
endpointName = currAnnotation.Endpoint.ServiceName
}
// Check if annotation has span kind information.
annotationHasSpanKind := false
switch currAnnotation.Value {
case "cs", "cr", "ms", "mr", "ss", "sr":
annotationHasSpanKind = true
}
// Populate the endpoint if it is not already populated and current endpoint
// has a service name and span kind.
if res.Endpoint == nil && endpointName != unknownServiceName && annotationHasSpanKind {
res.Endpoint = currAnnotation.Endpoint
}
if !spanKindIsSet && annotationHasSpanKind {
// We have not yet populated span kind, do it now.
// Translate from Zipkin span kind stored in Value field to Kind/ExternalKind
// pair of internal fields.
switch currAnnotation.Value {
case "cs", "cr":
res.Kind = tracepb.Span_CLIENT
res.ExtendedKind = tracetranslator.OpenTracingSpanKindClient
case "ms":
// "ms" and "mr" are PRODUCER and CONSUMER kinds which have no equivalent
// representation in OC. We keep res.Kind unspecified and will use
// ExtendedKind for translations.
res.ExtendedKind = tracetranslator.OpenTracingSpanKindProducer
case "mr":
res.ExtendedKind = tracetranslator.OpenTracingSpanKindConsumer
case "ss", "sr":
res.Kind = tracepb.Span_SERVER
res.ExtendedKind = tracetranslator.OpenTracingSpanKindServer
}
// Remember that we populated the span kind, so that we don't do it again.
spanKindIsSet = true
}
ts := epochMicrosecondsToTimestamp(currAnnotation.Timestamp)
if currAnnotation.Timestamp < earlyAnnotationTimestamp {
earlyAnnotationTimestamp = currAnnotation.Timestamp
res.EarlyAnnotationTime = ts
}
if currAnnotation.Timestamp > lateAnnotationTimestamp {
lateAnnotationTimestamp = currAnnotation.Timestamp
res.LateAnnotationTime = ts
}
if annotationHasSpanKind {
// If this annotation is for the send/receive timestamps, no need to create the annotation
continue
}
timeEvent := &tracepb.Span_TimeEvent{
Time: ts,
// More economically we could use a tracepb.Span_TimeEvent_Message, however, it will mean the loss of some information.
// Using the more expensive annotation until/if something cheaper is needed.
Value: &tracepb.Span_TimeEvent_Annotation_{
Annotation: &tracepb.Span_TimeEvent_Annotation{
Attributes: &tracepb.Span_Attributes{
AttributeMap: map[string]*tracepb.AttributeValue{
currAnnotation.Value: {
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{
Value: endpointName,
},
},
},
},
},
},
},
}
timeEvents = append(timeEvents, timeEvent)
}
if len(timeEvents) > 0 {
res.TimeEvents = &tracepb.Span_TimeEvents{TimeEvent: timeEvents}
}
if res.Endpoint == nil {
res.Endpoint = &endpoint{
ServiceName: unknownServiceName,
}
}
return res
}
func hexTraceIDToOCTraceID(hex string) ([]byte, error) {
// Per info at https://zipkin.io/zipkin-api/zipkin-api.yaml it should be 16 or 32 characters
hexLen := len(hex)
if hexLen != 16 && hexLen != 32 {
return nil, errHexTraceIDWrongLen
}
var high, low uint64
var err error
if hexLen == 32 {
if high, err = strconv.ParseUint(hex[:16], 16, 64); err != nil {
return nil, errHexTraceIDParsing
}
}
if low, err = strconv.ParseUint(hex[hexLen-16:], 16, 64); err != nil {
return nil, errHexTraceIDParsing
}
if high == 0 && low == 0 {
return nil, errHexTraceIDZero
}
return tracetranslator.UInt64ToByteTraceID(high, low), nil
}
func hexIDToOCID(hex string) ([]byte, error) {
// Per info at https://zipkin.io/zipkin-api/zipkin-api.yaml it should be 16 characters
if len(hex) != 16 {
return nil, errHexIDWrongLen
}
idValue, err := strconv.ParseUint(hex, 16, 64)
if err != nil {
return nil, errHexIDParsing
}
if idValue == 0 {
return nil, errHexIDZero
}
return tracetranslator.UInt64ToByteSpanID(idValue), nil
}
func epochMicrosecondsToTimestamp(msecs int64) *timestamp.Timestamp {
if msecs <= 0 {
return nil
}
t := ×tamp.Timestamp{}
t.Seconds = msecs / 1e6
t.Nanos = int32(msecs%1e6) * 1e3
return t
}
func getOrCreateNodeRequest(m map[string]*consumerdata.TraceData, endpoint *endpoint) *consumerdata.TraceData {
// this private function assumes that the caller never passes an nil endpoint
nodeKey := endpoint.string()
req := m[nodeKey]
if req != nil {
return req
}
req = &consumerdata.TraceData{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: endpoint.ServiceName},
},
}
if attributeMap := endpoint.createAttributeMap(); attributeMap != nil {
req.Node.Attributes = attributeMap
}
m[nodeKey] = req
return req
}
func (ep *endpoint) string() string {
return fmt.Sprintf("%s-%s-%s-%d", ep.ServiceName, ep.IPv4, ep.IPv6, ep.Port)
}
func (ep *endpoint) createAttributeMap() map[string]string {
if ep.IPv4 == "" && ep.IPv6 == "" && ep.Port == 0 {
return nil
}
attributeMap := make(map[string]string, 3)
if ep.IPv4 != "" {
attributeMap["ipv4"] = ep.IPv4
}
if ep.IPv6 != "" {
attributeMap["ipv6"] = ep.IPv6
}
if ep.Port != 0 {
attributeMap["port"] = strconv.Itoa(int(ep.Port))
}
return attributeMap
}
func SetTimestampsIfUnset(span *tracepb.Span) {
// zipkin allows timestamp to be unset, but opentelemetry-collector expects it to have a value.
// If this is unset, the conversion from open census to the internal trace format breaks
// what should be an identity transformation oc -> internal -> oc
if span.StartTime == nil {
now := internal.TimeToTimestamp(time.Now())
span.StartTime = now
span.EndTime = now
if span.Attributes == nil {
span.Attributes = &tracepb.Span_Attributes{}
}
if span.Attributes.AttributeMap == nil {
span.Attributes.AttributeMap = make(map[string]*tracepb.AttributeValue, 1)
}
span.Attributes.AttributeMap[StartTimeAbsent] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_BoolValue{
BoolValue: true,
}}
}
}