-
Notifications
You must be signed in to change notification settings - Fork 67
/
nats_driver.go
87 lines (71 loc) 路 1.93 KB
/
nats_driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package pipeline
import (
"context"
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
type NatsDriver[T any] struct {
log loggerFn
conn *nats.Conn
topic string
subscription *nats.Subscription
}
func NewNatsDriver[T any](conn *nats.Conn, topic string) *NatsDriver[T] {
return &NatsDriver[T]{
log: newLoggerFn(fmt.Sprintf("NatsDriver - %s", topic)),
conn: conn,
topic: topic,
}
}
func (d *NatsDriver[T]) Enqueue(ctx context.Context, msg T) {
msgJson, err := json.Marshal(msg)
if err != nil {
fmt.Printf("could not marshal message: %s\n", err.Error())
}
header := make(nats.Header)
if propagator := otel.GetTextMapPropagator(); propagator != nil {
propagator.Inject(ctx, propagation.HeaderCarrier(header))
}
err = d.conn.PublishMsg(&nats.Msg{
Subject: d.topic,
Header: header,
Data: msgJson,
})
if err != nil {
fmt.Printf("could not send publish message request: %s\n", err.Error())
}
}
// SetListener implements QueueDriver.
func (d *NatsDriver[T]) SetListener(listener Listener[T]) {
subscription, err := d.conn.Subscribe(d.topic, func(msg *nats.Msg) {
var target T
err := json.Unmarshal(msg.Data, &target)
if err != nil {
fmt.Printf(`could not unmarshal message got in queue "%s": %s\n`, d.topic, err.Error())
}
ctx := context.Background()
if propagator := otel.GetTextMapPropagator(); propagator != nil {
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(msg.Header))
}
// TODO: We probably should return an error for acking or nacking this message
listener.Listen(ctx, target)
msg.Ack()
})
if err != nil {
panic(err)
}
d.subscription = subscription
}
func (d *NatsDriver[T]) Start() {
d.log("start")
}
func (d *NatsDriver[T]) Stop() {
err := d.subscription.Unsubscribe()
if err != nil {
d.log(`could not unsubscribe to topic "%s"\n`, d.topic)
}
d.subscription = nil
}