-
Notifications
You must be signed in to change notification settings - Fork 0
/
adapter.go
97 lines (75 loc) · 1.99 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package natstarget
import (
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/zeiss/typhoon/pkg/apis/targets"
"go.uber.org/zap"
pkgadapter "knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/logging"
)
// EnvAccessorCtor for configuration parameters
func EnvAccessorCtor() pkgadapter.EnvConfigAccessor {
return &envAccessor{}
}
type envAccessor struct {
pkgadapter.EnvConfig
url string `envconfig:"NATS_URL"`
subject string `envconfig:"NATS_SUBJECT"`
}
var _ pkgadapter.Adapter = (*natsAdapter)(nil)
type natsAdapter struct {
client cloudevents.Client
js jetstream.JetStream
conn *nats.Conn
subject string
logger *zap.SugaredLogger
mt *pkgadapter.MetricTag
}
// NewTarget adapter implementation
func NewTarget(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, client cloudevents.Client) pkgadapter.Adapter {
logger := logging.FromContext(ctx)
mt := &pkgadapter.MetricTag{
ResourceGroup: targets.KafkaTargetResource.String(),
Namespace: envAcc.GetNamespace(),
Name: envAcc.GetName(),
}
env := envAcc.(*envAccessor)
nc, err := nats.Connect(env.url)
if err != nil {
logger.Panicw("failed to connect to NATS", zap.Error(err))
}
js, err := jetstream.New(nc)
if err != nil {
logger.Panicw("failed to connect to JetStream", zap.Error(err))
}
return &natsAdapter{
conn: nc,
mt: mt,
js: js,
client: client,
subject: env.subject,
}
}
// Start is the main entrypoint for the adapter
func (a *natsAdapter) Start(ctx context.Context) error {
a.logger.Info("starting NATS.io adapter")
defer func() {
a.conn.Close()
}()
return a.client.StartReceiver(ctx, a.dispatch)
}
func (a *natsAdapter) dispatch(event cloudevents.Event) cloudevents.Result {
msg := event.Data()
f, err := a.js.PublishAsync(a.subject, msg)
if err != nil {
return err
}
select {
case <-f.Err():
return err
case <-f.Ok():
return cloudevents.ResultACK
}
}