Skip to content

Commit

Permalink
Merge pull request #22 from usedatabrew/nats-rabbit-sinks-improvements
Browse files Browse the repository at this point in the history
feat(sinks): add auth mechanics for nats, exchange name as optional f…
  • Loading branch information
le-vlad committed Feb 7, 2024
2 parents 9c09cd2 + 136b466 commit a20168b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
6 changes: 4 additions & 2 deletions internal/sinks/nats/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nats

type Config struct {
Url string `json:"url"`
Subject string `json:"subject"`
Url string `json:"url" yaml:"url"`
Subject string `json:"subject" yaml:"subject"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
}
8 changes: 7 additions & 1 deletion internal/sinks/nats/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ func NewNatsSinkPlugin(config Config, schema []schema.StreamSchema, appCtx *stre
}

func (s *SinkPlugin) Connect(ctx context.Context) error {
client, err := nats.Connect(s.config.Url)
var opt nats.Option

if s.config.Username != "" {
opt = nats.UserInfo(s.config.Username, s.config.Password)
}

client, err := nats.Connect(s.config.Url, opt)

if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion internal/sinks/rabbit_mq/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,17 @@ func (s *SinkPlugin) Connect(ctx context.Context) error {
}

func (s *SinkPlugin) Write(message *message.Message) error {
var opts func(*rabbitmq.PublishOptions)

if s.config.ExchangeName != "" {
opts = rabbitmq.WithPublishOptionsExchange(s.config.ExchangeName)
}

return s.publisher.Publish(
[]byte(message.AsJSONString()),
[]string{s.config.RoutingKey},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsExchange(s.config.ExchangeName),
opts,
)
}

Expand Down

0 comments on commit a20168b

Please sign in to comment.