/
client.go
129 lines (101 loc) · 2.92 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package eventbus
import (
"context"
"encoding/json"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
events "github.com/metal-toolbox/governor-api/pkg/events/v1alpha1"
"github.com/nats-io/nats.go"
)
const (
defaultSubject = "events"
natsTracerName = "github.com/metal-toolbox/governor-api:nats"
)
type conn interface {
Publish(subject string, data []byte) error
PublishMsg(m *nats.Msg) error
Drain() error
}
// Client is an event bus client with some configuration
type Client struct {
conn conn
logger *zap.Logger
prefix string
tracer trace.Tracer
}
// Option is a functional configuration option for governor eventing
type Option func(c *Client)
// NewClient configures and establishes a new event bus client connection
func NewClient(opts ...Option) *Client {
client := Client{
logger: zap.NewNop(),
prefix: defaultSubject,
tracer: otel.GetTracerProvider().Tracer(natsTracerName),
}
for _, opt := range opts {
opt(&client)
}
return &client
}
// WithNATSConn sets the nats connection
func WithNATSConn(nc conn) Option {
return func(c *Client) {
c.conn = nc
}
}
// WithNATSPrefix sets the nats subscription prefix
func WithNATSPrefix(p string) Option {
return func(c *Client) {
c.prefix = p
}
}
// WithLogger sets the client logger
func WithLogger(l *zap.Logger) Option {
return func(c *Client) {
c.logger = l
}
}
// Shutdown drains the event bus and closes the connections
func (c *Client) Shutdown() error {
return c.conn.Drain()
}
// Publish an event on the event bus
func (c *Client) Publish(ctx context.Context, sub string, event *events.Event) error {
if event == nil {
return ErrEmptyEvent
}
subject := c.prefix + "." + sub
c.logger.Info("publishing event to the event bus", zap.String("subject", subject), zap.Any("action", event.Action))
_, span := c.tracer.Start(ctx, "events.nats.PublishEvent", trace.WithAttributes(
attribute.String("events.action", event.Action),
attribute.String("event.subject", subject),
attribute.String("event.actor_id", event.ActorID),
))
defer span.End()
// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, mapCarrier)
event.TraceContext = mapCarrier
payload, err := json.Marshal(event)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
headers := nats.Header{}
if cid := events.ExtractCorrelationID(ctx); cid != "" {
c.logger.Debug("publishing event with correlation ID", zap.String("correlationID", cid))
span.SetAttributes(attribute.String("event.correlation_id", cid))
headers.Add(events.GovernorEventCorrelationIDHeader, cid)
}
msg := &nats.Msg{
Subject: subject,
Data: payload,
Header: headers,
}
return c.conn.PublishMsg(msg)
}