From 20fb507cd1104eb7d1a5d29344506e92ea759e54 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 28 Feb 2023 13:44:15 +0100 Subject: [PATCH] [ADDED] Exponential backoff for server reconnects --- nats.go | 68 ++++++++++++++++++++++++++++++++---------- nats_test.go | 61 ++++++++++++++++++++++++++++++++----- test/conn_test.go | 1 + test/js_test.go | 2 +- test/reconnect_test.go | 10 ------- 5 files changed, 107 insertions(+), 35 deletions(-) diff --git a/nats.go b/nats.go index ee53ea910..85365ce94 100644 --- a/nats.go +++ b/nats.go @@ -50,8 +50,7 @@ const ( Version = "1.24.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 - DefaultMaxReconnect = 60 - DefaultReconnectWait = 2 * time.Second + DefaultMaxReconnect = -1 DefaultReconnectJitter = 100 * time.Millisecond DefaultReconnectJitterTLS = time.Second DefaultTimeout = 2 * time.Second @@ -62,6 +61,10 @@ const ( RequestChanLen = 8 DefaultDrainTimeout = 30 * time.Second LangString = "go" + + // DEPRECATED: Client now uses [nats.DefaultReconnectBackoffHandler] to + // handle default reconnect wait time. + DefaultReconnectWait = 2 * time.Second ) const ( @@ -143,17 +146,17 @@ var ( // GetDefaultOptions returns default configuration options for the client. func GetDefaultOptions() Options { return Options{ - AllowReconnect: true, - MaxReconnect: DefaultMaxReconnect, - ReconnectWait: DefaultReconnectWait, - ReconnectJitter: DefaultReconnectJitter, - ReconnectJitterTLS: DefaultReconnectJitterTLS, - Timeout: DefaultTimeout, - PingInterval: DefaultPingInterval, - MaxPingsOut: DefaultMaxPingOut, - SubChanLen: DefaultMaxChanLen, - ReconnectBufSize: DefaultReconnectBufSize, - DrainTimeout: DefaultDrainTimeout, + AllowReconnect: true, + MaxReconnect: DefaultMaxReconnect, + ReconnectJitter: DefaultReconnectJitter, + ReconnectJitterTLS: DefaultReconnectJitterTLS, + Timeout: DefaultTimeout, + PingInterval: DefaultPingInterval, + MaxPingsOut: DefaultMaxPingOut, + SubChanLen: DefaultMaxChanLen, + ReconnectBufSize: DefaultReconnectBufSize, + DrainTimeout: DefaultDrainTimeout, + IgnoreAuthErrorAbort: true, } } @@ -470,6 +473,7 @@ type Options struct { // IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). + // DEPRECATED: This option will be removed in future releases. IgnoreAuthErrorAbort bool // SkipHostLookup skips the DNS lookup for the server hostname. @@ -1260,6 +1264,7 @@ func CustomInboxPrefix(p string) Option { // IgnoreAuthErrorAbort opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice. +// DEPRECATED: This option is now set to 'true' by default, therefore this option will be removed in future releases. func IgnoreAuthErrorAbort() Option { return func(o *Options) error { o.IgnoreAuthErrorAbort = true @@ -1267,6 +1272,14 @@ func IgnoreAuthErrorAbort() Option { } } +// AbortOnAuthErrors causes the client to bail out after 2 subsequent auth connection errors. +func AbortOnAuthErrors() Option { + return func(o *Options) error { + o.IgnoreAuthErrorAbort = false + return nil + } +} + // SkipHostLookup is an Option to skip the host lookup when connecting to a server. func SkipHostLookup() Option { return func(o *Options) error { @@ -2559,6 +2572,28 @@ func (nc *Conn) stopPingTimer() { } } +// DefaultReconnectBackoffHandler returns a default reconnect exponential backoff interval. +// Base reconnect wait is 10ms, with x2 multiplier. Max wait time is 2 minutes. +// 10ms, 20ms, 40ms, 80ms...2m +// A random jitter is added to the result. +func DefaultReconnectBackoffHandler(jitter time.Duration) ReconnectDelayHandler { + return func(attempts int) time.Duration { + // base interval is 10ms + backoff := 10 * time.Millisecond + for i := 0; i < attempts-1; i++ { + backoff *= 2 + if backoff > 2*time.Minute { + backoff = 2 * time.Minute + break + } + } + if jitter > 0 { + jitter = time.Duration(rand.Int63n(int64(jitter))) + } + return backoff + jitter + } +} + // Try to reconnect using the option parameters. // This function assumes we are allowed to reconnect. func (nc *Conn) doReconnect(err error) { @@ -2596,11 +2631,10 @@ func (nc *Conn) doReconnect(err error) { var wlf int var jitter time.Duration - var rw time.Duration // If a custom reconnect delay handler is set, this takes precedence. crd := nc.Opts.CustomReconnectDelayCB - if crd == nil { - rw = nc.Opts.ReconnectWait + rw := nc.Opts.ReconnectWait + if crd == nil && rw == 0 { // TODO: since we sleep only after the whole list has been tried, we can't // rely on individual *srv to know if it is a TLS or non-TLS url. // We have to pick which type of jitter to use, for now, we use these hints: @@ -2608,6 +2642,8 @@ func (nc *Conn) doReconnect(err error) { if nc.Opts.Secure || nc.Opts.TLSConfig != nil { jitter = nc.Opts.ReconnectJitterTLS } + + crd = DefaultReconnectBackoffHandler(jitter) } for i := 0; len(nc.srvPool) > 0; { diff --git a/nats_test.go b/nats_test.go index 33b2df44b..64ce719b2 100644 --- a/nats_test.go +++ b/nats_test.go @@ -511,6 +511,7 @@ func TestSelectNextServer(t *testing.T) { opts := GetDefaultOptions() opts.Servers = testServers opts.NoRandomize = true + opts.MaxReconnect = 60 nc := &Conn{Opts: opts} if err := nc.setupServerPool(); err != nil { t.Fatalf("Problem setting up Server Pool: %v\n", err) @@ -1609,14 +1610,14 @@ func TestExpiredAuthentication(t *testing.T) { name string expectedProto string expectedErr error - ignoreAbort bool + withAuthAbort bool }{ - {"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false}, - {"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false}, - {"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false}, {"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, true}, {"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, true}, {"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, true}, + {"expired users credentials, abort connection", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false}, + {"revoked users credentials, abort connection", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false}, + {"expired account, abort connection", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false}, } { t.Run(test.name, func(t *testing.T) { l, e := net.Listen("tcp", "127.0.0.1:0") @@ -1678,8 +1679,8 @@ func TestExpiredAuthentication(t *testing.T) { ch <- true }), } - if test.ignoreAbort { - opts = append(opts, IgnoreAuthErrorAbort()) + if test.withAuthAbort { + opts = append(opts, AbortOnAuthErrors()) } nc, err := Connect(url, opts...) if err != nil { @@ -1687,7 +1688,7 @@ func TestExpiredAuthentication(t *testing.T) { } defer nc.Close() - if test.ignoreAbort { + if !test.withAuthAbort { // We expect more than 3 errors, as the connect attempt should not be aborted after 2 failed attempts. for i := 0; i < 4; i++ { select { @@ -2171,7 +2172,7 @@ func BenchmarkNextMsgNoTimeout(b *testing.B) { } } -func TestAuthErrorOnReconnect(t *testing.T) { +func TestAuthErrorOnReconnectWithAuthErrorAbort(t *testing.T) { // This is a bit of an artificial test, but it is to demonstrate // that if the client is disconnected from a server (not due to an auth error), // it will still correctly stop the reconnection logic if it gets twice an @@ -2199,6 +2200,7 @@ func TestAuthErrorOnReconnect(t *testing.T) { MaxReconnects(-1), DontRandomize(), ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}), + AbortOnAuthErrors(), DisconnectErrHandler(func(_ *Conn, e error) { dch <- true }), @@ -2948,6 +2950,49 @@ func TestInProcessConn(t *testing.T) { } } +func TestDefaultReconnectBackoffHandler(t *testing.T) { + tests := []struct { + name string + attempts int + jitter time.Duration + expectedRange []time.Duration + }{ + { + name: "4 attempts, no jitter", + attempts: 4, + expectedRange: []time.Duration{80 * time.Millisecond}, + }, + { + name: "1 attempt, no jitter, return base value", + attempts: 1, + expectedRange: []time.Duration{10 * time.Millisecond}, + }, + { + name: "100 attempts, no jitter, return max", + attempts: 100, + expectedRange: []time.Duration{2 * time.Minute}, + }, + { + name: "4 attempts, with jitter", + attempts: 4, + jitter: 20 * time.Millisecond, + expectedRange: []time.Duration{80 * time.Millisecond, 99 * time.Millisecond}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cb := DefaultReconnectBackoffHandler(test.jitter) + res := cb(test.attempts) + if test.jitter == 0 { + if res != test.expectedRange[0] { + t.Fatalf("Invalid result; want: %s; got: %s", test.expectedRange[0], res) + } + } + }) + } +} + func TestServerListWithTrailingComma(t *testing.T) { s := RunServerOnPort(-1) defer s.Shutdown() diff --git a/test/conn_test.go b/test/conn_test.go index 87532ac66..dbdbb48ec 100644 --- a/test/conn_test.go +++ b/test/conn_test.go @@ -2564,6 +2564,7 @@ func TestRetryOnFailedConnect(t *testing.T) { nats.RetryOnFailedConnect(true), nats.MaxReconnects(-1), nats.ReconnectWait(15*time.Millisecond), + nats.AbortOnAuthErrors(), nats.ReconnectHandler(func(_ *nats.Conn) { ch <- true }), diff --git a/test/js_test.go b/test/js_test.go index 8e4a5f574..0030063b2 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -8629,7 +8629,7 @@ func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) { } hbMissed <- struct{}{} } - nc, js := jsClient(t, s, nats.ErrorHandler(errHandler)) + nc, js := jsClient(t, s, nats.ErrorHandler(errHandler), nats.ReconnectWait(500*time.Millisecond)) defer nc.Close() if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil { diff --git a/test/reconnect_test.go b/test/reconnect_test.go index d1023a21d..e50dc17af 100644 --- a/test/reconnect_test.go +++ b/test/reconnect_test.go @@ -30,15 +30,6 @@ func startReconnectServer(t *testing.T) *server.Server { return RunServerOnPort(22222) } -func TestReconnectTotalTime(t *testing.T) { - opts := nats.GetDefaultOptions() - totalReconnectTime := time.Duration(opts.MaxReconnect) * opts.ReconnectWait - if totalReconnectTime < (2 * time.Minute) { - t.Fatalf("Total reconnect time should be at least 2 mins: Currently %v\n", - totalReconnectTime) - } -} - func TestDefaultReconnectJitter(t *testing.T) { opts := nats.GetDefaultOptions() if opts.ReconnectJitter != nats.DefaultReconnectJitter { @@ -123,7 +114,6 @@ var reconnectOpts = nats.Options{ Url: "nats://127.0.0.1:22222", AllowReconnect: true, MaxReconnect: 10, - ReconnectWait: 100 * time.Millisecond, Timeout: nats.DefaultTimeout, }