diff --git a/stan.go b/stan.go index a8b5631..6458874 100644 --- a/stan.go +++ b/stan.go @@ -137,8 +137,16 @@ type Options struct { // the NATS streaming connection does NOT close this NATS connection. // It is the responsibility of the application to manage the lifetime of // the supplied NATS connection. + // + // DEPRECATED: Users should provide NATS options through NatsOptions() + // instead to configure the underlying NATS connection. NatsConn *nats.Conn + // NatsOptions is an array of NATS options to configure the NATS connection + // that will be created and owned by the library. Note that some options + // may be overridden by the library. + NatsOptions []nats.Option + // ConnectTimeout is the timeout for the initial Connect(). This value is also // used for some of the internal request/replies with the cluster. ConnectTimeout time.Duration @@ -246,6 +254,9 @@ func MaxPubAcksInflight(max int) Option { // NatsConn is an Option to set the underlying NATS connection to be used // by a streaming connection object. When such option is set, closing the // streaming connection does not close the provided NATS connection. +// +// DEPRECATED: Users should use NatsOptions instead to configure the +// underlying NATS Connection created by the Streaming connection. func NatsConn(nc *nats.Conn) Option { return func(o *Options) error { o.NatsConn = nc @@ -253,6 +264,16 @@ func NatsConn(nc *nats.Conn) Option { } } +// NatsOptions is an Option to provide the NATS options that will be used +// to create the underlying NATS connection to be used by a streaming +// connection object. +func NatsOptions(opts ...nats.Option) Option { + return func(o *Options) error { + o.NatsOptions = append([]nats.Option(nil), opts...) + return nil + } +} + // Pings is an Option to set the ping interval and max out values. // The interval needs to be at least 1 and represents the number // of seconds. @@ -357,15 +378,22 @@ func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) { c.nc = c.opts.NatsConn // Create a NATS connection if it doesn't exist. if c.nc == nil { + nopts := c.opts.NatsOptions + nopts = append(nopts, nats.MaxReconnects(-1), nats.ReconnectBufSize(-1)) + // Set name only if not provided by the user... + var do nats.Options + for _, o := range nopts { + o(&do) + } + if do.Name == "" { + nopts = append(nopts, nats.Name(clientID)) + } // We will set the max reconnect attempts to -1 (infinite) // and the reconnect buffer to -1 to prevent any buffering // (which may cause a published message to be flushed on // reconnect while the API may have returned an error due // to PubAck timeout. - nc, err := nats.Connect(c.opts.NatsURL, - nats.Name(clientID), - nats.MaxReconnects(-1), - nats.ReconnectBufSize(-1)) + nc, err := nats.Connect(c.opts.NatsURL, nopts...) if err != nil { return nil, err } diff --git a/stan_test.go b/stan_test.go index f708b24..1c0b734 100644 --- a/stan_test.go +++ b/stan_test.go @@ -2736,3 +2736,28 @@ func TestSubTimeout(t *testing.T) { t.Fatalf("Unexpected sub close request: %+v", req) } } + +func TestNatsOptions(t *testing.T) { + snopts := natsd.DefaultTestOptions + snopts.Username = "foo" + snopts.Password = "bar" + ns := natsd.RunServer(&snopts) + defer ns.Shutdown() + + opts := server.GetDefaultOptions() + opts.NATSServerURL = "nats://foo:bar@127.0.0.1:4222" + opts.ID = clusterName + s := runServerWithOpts(opts) + defer s.Shutdown() + + sc, err := Connect(clusterName, clientName, + NatsOptions(nats.UserInfo("foo", "bar"), nats.Name("test"))) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer sc.Close() + + if n := sc.NatsConn().Opts.Name; n != "test" { + t.Fatalf("Name was not used: %q instead of %q", n, "test") + } +}