/
bus.go
138 lines (116 loc) · 3.3 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package nats
import (
"context"
"errors"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/vectrum-io/strongforce/pkg/bus"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
)
type Bus struct {
subscriber *Subscriber
broadcaster *Broadcaster
options *Options
logger *zap.SugaredLogger
}
type Options struct {
NATSAddress string
Logger *zap.Logger
Streams []nats.StreamConfig
OTelPropagator propagation.TextMapPropagator
}
func New(options *Options) (*Bus, error) {
if options.Logger == nil {
options.Logger = zap.L()
}
subscriber, err := NewSubscriber(&SubscriberOptions{
NATSAddress: options.NATSAddress,
OTelPropagator: options.OTelPropagator,
})
if err != nil {
return nil, err
}
broadcaster, err := NewBroadcaster(&BroadcasterOptions{
NATSAddress: options.NATSAddress,
Logger: options.Logger.Sugar(),
OTelPropagator: options.OTelPropagator,
})
if err != nil {
return nil, err
}
return &Bus{
subscriber: subscriber,
broadcaster: broadcaster,
options: options,
logger: options.Logger.Sugar(),
}, nil
}
func (b *Bus) Publish(ctx context.Context, message *bus.OutboundMessage) error {
return b.broadcaster.Broadcast(ctx, message)
}
func (b *Bus) Subscribe(ctx context.Context, subscriberName string, stream string, opts ...bus.SubscribeOption) (*bus.Subscription, error) {
subscriptionOptions := bus.DefaultSubscriptionOptions
for _, opt := range opts {
opt(&subscriptionOptions)
}
var deliverPolicy jetstream.DeliverPolicy
switch subscriptionOptions.DeliveryPolicy {
case bus.DeliverAll:
deliverPolicy = jetstream.DeliverAllPolicy
case bus.DeliverNew:
deliverPolicy = jetstream.DeliverNewPolicy
}
maxAckPending := -1
if subscriptionOptions.GuaranteeOrder {
maxAckPending = 1
}
var durableName string
if subscriptionOptions.Durable {
durableName = subscriberName
}
subscription, err := b.subscriber.Subscribe(ctx, stream, &SubscribeOpts{
ConsumerName: subscriberName,
DurableName: durableName,
CreateConsumer: true,
DeliverPolicy: &deliverPolicy,
FilterSubjects: subscriptionOptions.FilterSubjects,
MaxDeliverTries: subscriptionOptions.MaxDeliveryTries,
MaxAckPending: maxAckPending,
})
if err != nil {
return nil, err
}
return subscription, nil
}
func (b *Bus) Migrate(ctx context.Context) error {
conn, err := nats.Connect(b.options.NATSAddress)
if err != nil {
return fmt.Errorf("failed to connect to nats: %w", err)
}
js, err := conn.JetStream()
if err != nil {
return fmt.Errorf("failed to get jetstream context: %w", err)
}
for _, streamConfig := range b.options.Streams {
b.logger.Infof("validating nats stream %s", streamConfig.Name)
_, err := js.StreamInfo(streamConfig.Name)
if err != nil {
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to get stream info: %w", err)
}
// create new stream
b.logger.Infof("creating new stream %s", streamConfig.Name)
if _, err := js.AddStream(&streamConfig); err != nil {
return fmt.Errorf("failed to add stream: %w", err)
}
continue
}
b.logger.Infof("updating existing stream %s", streamConfig.Name)
if _, err := js.UpdateStream(&streamConfig); err != nil {
return fmt.Errorf("failed to update stream: %w", err)
}
}
return nil
}