forked from instana/go-sensor
/
topic.go
97 lines (80 loc) · 2.99 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// (c) Copyright IBM Corp. 2021
// (c) Copyright Instana Inc. 2020
//go:build go1.11
// +build go1.11
package pubsub
import (
"cloud.google.com/go/pubsub"
"context"
instana "github.com/mier85/go-sensor"
"github.com/mier85/go-sensor/instrumentation/cloud.google.com/go/internal/tags"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
)
// Topic is an instrumented wrapper for cloud.google.com/go/pubsub.Topic that traces Publish() calls
// and ensures Instana trace propagation across the Pub/Sub producers and consumers.
//
// See https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#Topic for further details on wrapped type.
type Topic struct {
*pubsub.Topic
projectID string
sensor *instana.Sensor
}
// Publish adds the trace context found in ctx to the message and publishes it to the wrapped topic.
// The exit span created for this operation will be finished only after the message was submitted to
// the server.
//
// See https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#Topic.Publish for further details on wrapped method.
func (top *Topic) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
parent, ok := instana.SpanFromContext(ctx)
if !ok {
return top.Topic.Publish(ctx, msg)
}
sp := parent.Tracer().StartSpan("gcps",
ext.SpanKindProducer,
opentracing.ChildOf(parent.Context()),
opentracing.Tags{
tags.GcpsOp: "PUBLISH",
tags.GcpsProjid: top.projectID,
tags.GcpsTop: top.ID(),
},
)
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
sp.Tracer().Inject(sp.Context(), opentracing.TextMap, opentracing.TextMapCarrier(msg.Attributes))
res := top.Topic.Publish(ctx, msg)
go func() {
id, err := res.Get(context.Background())
if err != nil {
sp.LogFields(otlog.Error(err))
} else {
sp.SetTag("gcps.msgid", id)
}
sp.Finish()
}()
return res
}
// Subscriptions calls Subscriptions() of underlying Topic and wraps the returned subscription iterator.
//
// See https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#Topic.Subscriptions for further details on wrapped method.
func (top *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
return &SubscriptionIterator{top.Topic.Subscriptions(ctx), top.projectID, top.sensor}
}
// TopicIterator is a wrapper for cloud.google.com/go/pubsub.TopicIterator that retrieves and instruments
// topics in a project.
//
// See https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#TopicIterator for further details on wrapped type.
type TopicIterator struct {
*pubsub.TopicIterator
projectID string
sensor *instana.Sensor
}
// Next fetches the next topic in project via the wrapped TopicIterator and returns its wrapped version.
//
// See https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#TopicIterator.Next for further details on wrapped method.
func (it *TopicIterator) Next() (*Topic, error) {
top, err := it.TopicIterator.Next()
return &Topic{top, it.projectID, it.sensor}, err
}