-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
57 lines (48 loc) · 1.79 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package graph
import (
"context"
"cloud.google.com/go/pubsub"
"github.com/google/uuid"
"github.com/nais/api/internal/slug"
"github.com/nais/api/pkg/protoapi"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
)
func (r *Resolver) triggerTeamUpdatedEvent(ctx context.Context, teamSlug slug.Slug, correlationID uuid.UUID) {
r.triggerEvent(ctx, protoapi.EventTypes_EVENT_TEAM_UPDATED, &protoapi.EventTeamUpdated{Slug: teamSlug.String()}, correlationID)
}
func (r *Resolver) triggerTeamDeletedEvent(ctx context.Context, teamSlug slug.Slug, correlationID uuid.UUID) {
r.triggerEvent(ctx, protoapi.EventTypes_EVENT_TEAM_DELETED, &protoapi.EventTeamDeleted{Slug: teamSlug.String()}, correlationID)
}
func (r *Resolver) triggerEvent(ctx context.Context, event protoapi.EventTypes, msg proto.Message, correlationID uuid.UUID) {
ctx, span := otel.Tracer("").Start(ctx, "trigger pubsub event", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(
semconv.EventName(event.String()),
semconv.MessagingDestinationNameKey.String(r.pubsubTopic.String()),
semconv.MessagingSystemGCPPubsub,
))
defer span.End()
b, err := proto.Marshal(msg)
if err != nil {
span.RecordError(err)
panic(err)
}
attrs := map[string]string{
"CorrelationID": correlationID.String(),
"EventType": event.String(),
}
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(attrs))
id, err := r.pubsubTopic.Publish(ctx, &pubsub.Message{
Data: b,
Attributes: attrs,
}).Get(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetAttributes(semconv.MessagingMessageID(id))
}
}