Skip to content

Commit

Permalink
Merge pull request #355 from nats-io/add_nats_opts
Browse files Browse the repository at this point in the history
[ADDED] NatsOptions to configure the NATS connection on stan.Connect()
  • Loading branch information
kozlovic committed Jul 28, 2021
2 parents 1f563ca + a53d804 commit 1bf5a2b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
36 changes: 32 additions & 4 deletions stan.go
Expand Up @@ -137,8 +137,16 @@ type Options struct {
// the NATS streaming connection does NOT close this NATS connection.
// It is the responsibility of the application to manage the lifetime of
// the supplied NATS connection.
//
// DEPRECATED: Users should provide NATS options through NatsOptions()
// instead to configure the underlying NATS connection.
NatsConn *nats.Conn

// NatsOptions is an array of NATS options to configure the NATS connection
// that will be created and owned by the library. Note that some options
// may be overridden by the library.
NatsOptions []nats.Option

// ConnectTimeout is the timeout for the initial Connect(). This value is also
// used for some of the internal request/replies with the cluster.
ConnectTimeout time.Duration
Expand Down Expand Up @@ -246,13 +254,26 @@ func MaxPubAcksInflight(max int) Option {
// NatsConn is an Option to set the underlying NATS connection to be used
// by a streaming connection object. When such option is set, closing the
// streaming connection does not close the provided NATS connection.
//
// DEPRECATED: Users should use NatsOptions instead to configure the
// underlying NATS Connection created by the Streaming connection.
func NatsConn(nc *nats.Conn) Option {
return func(o *Options) error {
o.NatsConn = nc
return nil
}
}

// NatsOptions is an Option to provide the NATS options that will be used
// to create the underlying NATS connection to be used by a streaming
// connection object.
func NatsOptions(opts ...nats.Option) Option {
return func(o *Options) error {
o.NatsOptions = append([]nats.Option(nil), opts...)
return nil
}
}

// Pings is an Option to set the ping interval and max out values.
// The interval needs to be at least 1 and represents the number
// of seconds.
Expand Down Expand Up @@ -357,15 +378,22 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
c.nc = c.opts.NatsConn
// Create a NATS connection if it doesn't exist.
if c.nc == nil {
nopts := c.opts.NatsOptions
nopts = append(nopts, nats.MaxReconnects(-1), nats.ReconnectBufSize(-1))
// Set name only if not provided by the user...
var do nats.Options
for _, o := range nopts {
o(&do)
}
if do.Name == "" {
nopts = append(nopts, nats.Name(clientID))
}
// We will set the max reconnect attempts to -1 (infinite)
// and the reconnect buffer to -1 to prevent any buffering
// (which may cause a published message to be flushed on
// reconnect while the API may have returned an error due
// to PubAck timeout.
nc, err := nats.Connect(c.opts.NatsURL,
nats.Name(clientID),
nats.MaxReconnects(-1),
nats.ReconnectBufSize(-1))
nc, err := nats.Connect(c.opts.NatsURL, nopts...)
if err != nil {
return nil, err
}
Expand Down
25 changes: 25 additions & 0 deletions stan_test.go
Expand Up @@ -2736,3 +2736,28 @@ func TestSubTimeout(t *testing.T) {
t.Fatalf("Unexpected sub close request: %+v", req)
}
}

func TestNatsOptions(t *testing.T) {
snopts := natsd.DefaultTestOptions
snopts.Username = "foo"
snopts.Password = "bar"
ns := natsd.RunServer(&snopts)
defer ns.Shutdown()

opts := server.GetDefaultOptions()
opts.NATSServerURL = "nats://foo:bar@127.0.0.1:4222"
opts.ID = clusterName
s := runServerWithOpts(opts)
defer s.Shutdown()

sc, err := Connect(clusterName, clientName,
NatsOptions(nats.UserInfo("foo", "bar"), nats.Name("test")))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer sc.Close()

if n := sc.NatsConn().Opts.Name; n != "test" {
t.Fatalf("Name was not used: %q instead of %q", n, "test")
}
}

0 comments on commit 1bf5a2b

Please sign in to comment.