From 1f212c526c013168f3a818c0637f8a0cc52d7fa2 Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Mon, 26 Jun 2023 15:37:42 +0800 Subject: [PATCH 1/3] Add session disconnect and retry timeouts --- Makefile | 2 +- README.md | 2 +- coherence/common.go | 4 +- coherence/doc.go | 19 ++- coherence/session.go | 164 +++++++++++++++----- coherence/session_test.go | 48 +++++- examples/events/cache/people_insert/main.go | 2 + test/e2e/standalone/event_test.go | 2 +- test/e2e/standalone/named_map_test.go | 4 +- test/e2e/standalone/session_test.go | 5 +- 10 files changed, 197 insertions(+), 55 deletions(-) diff --git a/Makefile b/Makefile index c1bf271..6ee15ba 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ # ---------------------------------------------------------------------------------------------------------------------- # This is the version of the coherence-go-client -VERSION ?=1.0.0-rc2 +VERSION ?=1.0.0-rc4 CURRDIR := $(shell pwd) USER_ID := $(shell echo "`id -u`:`id -g`") diff --git a/README.md b/README.md index 08cc295..a08a176 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ For local development, we recommend using the Coherence CE Docker image; it cont everything necessary for the client to operate correctly. ```bash -docker run -d -p 1408:1408 ghcr.io/oracle/coherence-ce:22.06.4 +docker run -d -p 1408:1408 -p 30000:30000 ghcr.io/oracle/coherence-ce:22.06.4 ``` ## Installation diff --git a/coherence/common.go b/coherence/common.go index 3d5b595..a7570f7 100644 --- a/coherence/common.go +++ b/coherence/common.go @@ -27,7 +27,9 @@ const ( envTLSClientCert = "COHERENCE_TLS_CLIENT_CERT" envTLSClientKey = "COHERENCE_TLS_CLIENT_KEY" envIgnoreInvalidCerts = "COHERENCE_IGNORE_INVALID_CERTS" - envSessionTimeout = "COHERENCE_SESSION_TIMEOUT" + envRequestTimeout = "COHERENCE_CLIENT_REQUEST_TIMEOUT" + envDisconnectTimeout = "COHERENCE_SESSION_DISCONNECT_TIMEOUT" + envReadyTimeout = "COHERENCE_READY_TIMEOUT" // envSessionDebug enabled session debug messages to be displayed. envSessionDebug = "COHERENCE_SESSION_DEBUG" diff --git a/coherence/doc.go b/coherence/doc.go index 7ed5f16..c168773 100644 --- a/coherence/doc.go +++ b/coherence/doc.go @@ -61,17 +61,26 @@ Refer to the section on [NewSession] for more information on setting up a SSL co See [SessionOptions] which lists all the options supported by the [Session] API. -# Controlling connection timeouts +# Controlling timeouts Most operations you call require you to supply a [context.Context]. If your context does not contain a deadline, the operation will wrap your context in a new [context.WithTimeout] using either the default timeout of 30,000 millis or -the value you set using option [coherence.WithSessionTimeout] when you called [NewSession]. +the value you set using option [coherence.WithRequestTimeout] when you called [NewSession]. -For example, to override the default timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following: +For example, to override the default request timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following: - session, err = coherence.NewSession(ctx, coherence.WithSessionTimeout(time.Duration(5) * time.Second)) + session, err = coherence.NewSession(ctx, coherence.WithRequestTimeout(time.Duration(5) * time.Second)) -You can also override the default timeout using the environment variable COHERENCE_SESSION_TIMEOUT. +You can also override the default request timeout using the environment variable COHERENCE_CLIENT_REQUEST_TIMEOUT. + +By default, if an endpoint is not ready, the Go client will fail-fast. You can change this behaviour by setting +the option [coherence.WithReadyTimeout] to a value millis value greater than zero which will cause the Go client +to wait until up to the timeout specified until it fails if no endpoint is available. You can also use the environment variable +COHERENCE_READY_TIMEOUT. + +You also have the ability to control maximum amount of time, in milliseconds, a [Session] may remain in a disconnected state +without successfully reconnecting. For this you use the option [coherence.WithDisconnectTimeout] or the environment +variable COHERENCE_SESSION_DISCONNECT_TIMEOUT. # Obtaining a NamedMap or NamedCache diff --git a/coherence/session.go b/coherence/session.go index 073da1d..4914aee 100644 --- a/coherence/session.go +++ b/coherence/session.go @@ -30,9 +30,11 @@ import ( var ErrInvalidFormat = errors.New("format can only be 'json'") const ( - defaultFormat = "json" - mapOrCacheExists = "the %s %s already exists with different type parameters" - defaultSessionTimeout = "30000" // millis + defaultFormat = "json" + mapOrCacheExists = "the %s %s already exists with different type parameters" + defaultRequestTimeout = "30000" // millis + defaultDisconnectTimeout = "30000" // millis + defaultReadyTimeout = "0" // millis ) // Session provides APIs to create NamedCaches. The [NewSession] method creates a @@ -56,15 +58,17 @@ type Session struct { // SessionOptions holds the session attributes like host, port, tls attributes etc. type SessionOptions struct { - Address string - TLSEnabled bool - Scope string - Format string - ClientCertPath string - ClientKeyPath string - CaCertPath string - PlainText bool - Timeout time.Duration + Address string + TLSEnabled bool + Scope string + Format string + ClientCertPath string + ClientKeyPath string + CaCertPath string + PlainText bool + RequestTimeout time.Duration + DisconnectTimeout time.Duration + ReadyTimeout time.Duration } // NewSession creates a new Session with the specified sessionOptions. @@ -115,9 +119,11 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) ( caches: make(map[string]interface{}, 0), lifecycleListeners: []*SessionLifecycleListener{}, sessOpts: &SessionOptions{ - PlainText: false, - Format: defaultFormat, - Timeout: time.Duration(0) * time.Second}, + PlainText: false, + Format: defaultFormat, + RequestTimeout: time.Duration(0) * time.Second, + ReadyTimeout: time.Duration(0) * time.Second, + DisconnectTimeout: time.Duration(0) * time.Second}, } if getBoolValueFromEnvVarOrDefault(envSessionDebug, false) { @@ -141,14 +147,34 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) ( session.sessOpts.Address = getStringValueFromEnvVarOrDefault(envHostName, "localhost:1408") } - // if no timeout then use the env or default - if session.sessOpts.Timeout == time.Duration(0) { - timeoutString := getStringValueFromEnvVarOrDefault(envSessionTimeout, defaultSessionTimeout) + // if no request timeout then use the env or default + if session.sessOpts.RequestTimeout == time.Duration(0) { + timeoutString := getStringValueFromEnvVarOrDefault(envRequestTimeout, defaultRequestTimeout) timeout, err := strconv.ParseInt(timeoutString, 10, 64) if err != nil || timeout <= 0 { - return nil, fmt.Errorf("invalid value of %s for timeout", timeoutString) + return nil, fmt.Errorf("invalid value of %s for request timeout", timeoutString) } - session.sessOpts.Timeout = time.Duration(timeout) * time.Millisecond + session.sessOpts.RequestTimeout = time.Duration(timeout) * time.Millisecond + } + + // if no disconnect timeout then use the env or default + if session.sessOpts.DisconnectTimeout == time.Duration(0) { + timeoutString := getStringValueFromEnvVarOrDefault(envDisconnectTimeout, defaultDisconnectTimeout) + timeout, err := strconv.ParseInt(timeoutString, 10, 64) + if err != nil || timeout <= 0 { + return nil, fmt.Errorf("invalid value of %s for disconnect timeout", timeoutString) + } + session.sessOpts.DisconnectTimeout = time.Duration(timeout) * time.Millisecond + } + + // if no ready timeout then use the env or default + if session.sessOpts.ReadyTimeout == time.Duration(0) { + timeoutString := getStringValueFromEnvVarOrDefault(envReadyTimeout, defaultReadyTimeout) + timeout, err := strconv.ParseInt(timeoutString, 10, 64) + if err != nil || timeout < 0 { + return nil, fmt.Errorf("invalid value of %s for ready timeout", timeoutString) + } + session.sessOpts.ReadyTimeout = time.Duration(timeout) * time.Millisecond } // ensure initial connection @@ -184,10 +210,27 @@ func WithPlainText() func(sessionOptions *SessionOptions) { } } -// WithSessionTimeout returns a function to set the session timeout. -func WithSessionTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) { +// WithRequestTimeout returns a function to set the request timeout in millis. +func WithRequestTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) { return func(s *SessionOptions) { - s.Timeout = timeout + s.RequestTimeout = timeout + } +} + +// WithDisconnectTimeout returns a function to set the maximum amount of time, in millis, a [Session] +// may remain in a disconnected state without successfully reconnecting. +func WithDisconnectTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) { + return func(s *SessionOptions) { + s.DisconnectTimeout = timeout + } +} + +// WithReadyTimeout returns a function to set the maximum amount of time an [NamedMap] or [NamedCache] +// operations may wait for the underlying gRPC channel to be ready. This is independent of the request +// timeout which sets a deadline on how long the call may take after being dispatched. +func WithReadyTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) { + return func(s *SessionOptions) { + s.ReadyTimeout = timeout } } @@ -217,9 +260,19 @@ func (s *Session) String() string { len(s.caches), len(s.maps), s.sessOpts) } -// GetSessionTimeout returns the session timeout in seconds. -func (s *Session) GetSessionTimeout() time.Duration { - return s.sessOpts.Timeout +// GetRequestTimeout returns the session timeout in millis. +func (s *Session) GetRequestTimeout() time.Duration { + return s.sessOpts.RequestTimeout +} + +// GetDisconnectTimeout returns the session disconnect timeout in millis. +func (s *Session) GetDisconnectTimeout() time.Duration { + return s.sessOpts.DisconnectTimeout +} + +// GetReadyTimeout returns the session disconnect timeout in millis. +func (s *Session) GetReadyTimeout() time.Duration { + return s.sessOpts.ReadyTimeout } // ensureConnection ensures a session has a valid connection @@ -227,6 +280,10 @@ func (s *Session) ensureConnection() error { if s.firstConnectAttempted { // We have previously tried to connect so check that the connect state is connected if s.conn.GetState() != connectivity.Ready { + // if the readyTime is set, and we are not connected then block and wait for connection + if s.GetReadyTimeout() != 0 { + return waitForReady(s) + } s.debug(fmt.Sprintf("session: %s attempting connection to address %s", s.sessionID, s.sessOpts.Address)) s.conn.Connect() return nil @@ -285,11 +342,11 @@ func (s *Session) ensureConnection() error { // refer: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html go func(session *Session) { var ( - firstConnect = true - connected = false - ctx = context.Background() - lastState = session.conn.GetState() - disconnectTime int64 = 0 + firstConnect = true + connected = false + ctx = context.Background() + lastState = session.conn.GetState() + disconnectTime int64 ) for { @@ -341,8 +398,9 @@ func (s *Session) ensureConnection() error { disconnectTime = time.Now().UnixMilli() } else { waited := time.Now().UnixMilli() - disconnectTime - if waited >= session.GetSessionTimeout().Milliseconds() { - log.Printf("session: %s unable to reconnect within [%s]. Closing session.", session.sessionID, session.GetSessionTimeout().String()) + if waited >= session.GetDisconnectTimeout().Milliseconds() { + log.Printf("session: %s unable to reconnect within disconnect timeout of [%s]. Closing session.", + session.sessionID, session.GetDisconnectTimeout().String()) session.Close() return } @@ -361,6 +419,38 @@ func (s *Session) ensureConnection() error { return nil } +// waitForReady waits until the connection is ready up to the ready session timeout and will +// return nil if the session was connected, otherwise an error is returned. +func waitForReady(s *Session) error { + var ( + readyTimeout = s.GetReadyTimeout() + messageLogged = false + ) + + //s.debug(fmt.Sprintf("State is %v, waiting until ready timeout of %v for valid connection", initialState, readyTimeout)) + + // try to connect up until timeout, then throw err if not available + timeout := time.Now().Add(readyTimeout) + for { + if time.Now().After(timeout) { + return fmt.Errorf("unable to connect to %s after ready timeout of %v", s.sessOpts.Address, readyTimeout) + } + + s.conn.Connect() + + time.Sleep(time.Duration(250) * time.Millisecond) + state := s.conn.GetState() + + if state == connectivity.Ready { + return nil + } + if !messageLogged { + log.Printf("State is %v, waiting until ready timeout of %v for valid connection", state, readyTimeout) + messageLogged = true + } + } +} + // GetOptions returns the options that were passed during this session creation. func (s *Session) GetOptions() *SessionOptions { return s.sessOpts @@ -530,8 +620,8 @@ func validateFilePath(file string) error { // String returns a string representation of SessionOptions. func (s *SessionOptions) String() string { var sb = strings.Builder{} - sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, timeout=%v", - s.Address, s.TLSEnabled, s.Scope, s.Format, s.Timeout)) + sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, request timeout=%v, disconnect timeout=%v, ready timeout=%v", + s.Address, s.TLSEnabled, s.Scope, s.Format, s.RequestTimeout, s.DisconnectTimeout, s.ReadyTimeout)) if s.TLSEnabled { sb.WriteString(fmt.Sprintf(" clientCertPath=%v, clientKeyPath=%v, caCertPath=%v,", @@ -556,8 +646,8 @@ func (s *Session) dispatch(eventType SessionLifecycleEventType, // [SessionOptions]. func (s *Session) ensureContext(ctx context.Context) (context.Context, context.CancelFunc) { if _, ok := ctx.Deadline(); !ok { - // no deadline set, so wrap the context in a Timeout - return context.WithTimeout(ctx, s.sessOpts.Timeout) + // no deadline set, so wrap the context in a RequestTimeout + return context.WithTimeout(ctx, s.sessOpts.RequestTimeout) } return ctx, nil } diff --git a/coherence/session_test.go b/coherence/session_test.go index dd6d2f6..75cda99 100644 --- a/coherence/session_test.go +++ b/coherence/session_test.go @@ -25,15 +25,51 @@ func TestSessionValidation(t *testing.T) { g.Expect(err).To(gomega.Equal(ErrInvalidFormat)) // test default timeout - timeout, _ := strconv.ParseInt(defaultSessionTimeout, 10, 64) + timeout, _ := strconv.ParseInt(defaultRequestTimeout, 10, 64) s, err := NewSession(ctx) g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) - g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond)) + g.Expect(s.sessOpts.RequestTimeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond)) - // test setting a timeout - s, err = NewSession(ctx, WithSessionTimeout(time.Duration(33)*time.Millisecond)) + // test setting a request timeout + s, err = NewSession(ctx, WithRequestTimeout(time.Duration(33)*time.Millisecond)) g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) - g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(33) * time.Millisecond)) + g.Expect(s.sessOpts.RequestTimeout).To(gomega.Equal(time.Duration(33) * time.Millisecond)) + + // test setting a disconnected timeout + s, err = NewSession(ctx, WithDisconnectTimeout(time.Duration(34)*time.Millisecond)) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.DisconnectTimeout).To(gomega.Equal(time.Duration(34) * time.Millisecond)) + + // test setting a ready timeout + s, err = NewSession(ctx, WithReadyTimeout(time.Duration(35)*time.Millisecond)) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.ReadyTimeout).To(gomega.Equal(time.Duration(35) * time.Millisecond)) +} + +func TestSessionEnvValidation(t *testing.T) { + var ( + g = gomega.NewWithT(t) + err error + ctx = context.Background() + ) + + // test default timeout + t.Setenv("COHERENCE_CLIENT_REQUEST_TIMEOUT", "5000") + s, err := NewSession(ctx) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.RequestTimeout).To(gomega.Equal(time.Duration(5000) * time.Millisecond)) + + // test setting a disconnected timeout + t.Setenv("COHERENCE_SESSION_DISCONNECT_TIMEOUT", "6000") + s, err = NewSession(ctx) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.DisconnectTimeout).To(gomega.Equal(time.Duration(6000) * time.Millisecond)) + + // test setting a ready timeout + t.Setenv("COHERENCE_READY_TIMEOUT", "7000") + s, err = NewSession(ctx) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.ReadyTimeout).To(gomega.Equal(time.Duration(7000) * time.Millisecond)) } func TestSessionEnvDebug(t *testing.T) { @@ -72,7 +108,7 @@ func TestSessionTimeout(t *testing.T) { ctx = context.Background() ) - t.Setenv(envSessionTimeout, "-1") + t.Setenv(envRequestTimeout, "-1") _, err := NewSession(ctx) g.Expect(err).To(gomega.HaveOccurred()) } diff --git a/examples/events/cache/people_insert/main.go b/examples/events/cache/people_insert/main.go index e2b8d4e..2619460 100644 --- a/examples/events/cache/people_insert/main.go +++ b/examples/events/cache/people_insert/main.go @@ -61,6 +61,8 @@ func main() { } defer session.Close() + log.Printf("%v\n", session) + namedMap, err := coherence.GetNamedMap[int, Person](session, "people") if err != nil { panic(err) diff --git a/test/e2e/standalone/event_test.go b/test/e2e/standalone/event_test.go index f4ec989..e288dd0 100644 --- a/test/e2e/standalone/event_test.go +++ b/test/e2e/standalone/event_test.go @@ -220,7 +220,7 @@ func TestMapEventMultipleListeners(t *testing.T) { } } -// RunTestReconnect tests that a gRPC connection will reset it's self and the map listetners +// RunTestReconnect tests that a gRPC connection will reset it's self and the map listeners // will re-register correctly. func RunTestReconnect(g *gomega.WithT, namedMap coherence.NamedMap[string, string]) { defer func(cache coherence.NamedMap[string, string], ctx context.Context) { diff --git a/test/e2e/standalone/named_map_test.go b/test/e2e/standalone/named_map_test.go index 9db588c..1da1484 100644 --- a/test/e2e/standalone/named_map_test.go +++ b/test/e2e/standalone/named_map_test.go @@ -106,7 +106,7 @@ func TestSessionWithSpecifiedTimeout(t *testing.T) { session *coherence.Session ) - session, err = GetSession(coherence.WithSessionTimeout(time.Duration(10) * time.Second)) + session, err = GetSession(coherence.WithRequestTimeout(time.Duration(10) * time.Second)) g.Expect(err).ShouldNot(gomega.HaveOccurred()) defer session.Close() @@ -120,7 +120,7 @@ func TestSessionWithEnvTimeout(t *testing.T) { session *coherence.Session ) - t.Setenv("COHERENCE_SESSION_TIMEOUT", "10000") + t.Setenv("COHERENCE_CLIENT_REQUEST_TIMEOUT", "10000") session, err = GetSession() g.Expect(err).ShouldNot(gomega.HaveOccurred()) diff --git a/test/e2e/standalone/session_test.go b/test/e2e/standalone/session_test.go index ef620df..740fc45 100644 --- a/test/e2e/standalone/session_test.go +++ b/test/e2e/standalone/session_test.go @@ -18,13 +18,16 @@ import ( // TestCacheLifecycle runs tests to ensure correct behaviour when working with session events. func TestSessionLifecycle(t *testing.T) { g := NewWithT(t) + + t.Setenv("COHERENCE_SESSION_DEBUG", "true") + session, err := GetSession() g.Expect(err).ShouldNot(HaveOccurred()) listener := NewAllLifecycleEventsListener() session.AddSessionLifecycleListener(listener.listener) - Sleep(5) + Sleep(15) // close the session session.Close() From f08e443a8d1a479dc2a9158ba2779242bb5db020 Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Tue, 27 Jun 2023 08:10:27 +0800 Subject: [PATCH 2/3] Minor --- coherence/session.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coherence/session.go b/coherence/session.go index 4914aee..2d2886e 100644 --- a/coherence/session.go +++ b/coherence/session.go @@ -421,14 +421,14 @@ func (s *Session) ensureConnection() error { // waitForReady waits until the connection is ready up to the ready session timeout and will // return nil if the session was connected, otherwise an error is returned. +// We intentionally do no use the gRPC WaitForReady as this can cause a race condition in the session +// events code. func waitForReady(s *Session) error { var ( readyTimeout = s.GetReadyTimeout() messageLogged = false ) - //s.debug(fmt.Sprintf("State is %v, waiting until ready timeout of %v for valid connection", initialState, readyTimeout)) - // try to connect up until timeout, then throw err if not available timeout := time.Now().Add(readyTimeout) for { From 439baec6ea3d860f90f6564c3375d33637927d63 Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Tue, 27 Jun 2023 14:03:12 +0800 Subject: [PATCH 3/3] Updates to session reconnect tests --- coherence/session.go | 36 ++++++++++++++++++------------- test/e2e/standalone/event_test.go | 29 +++++++++++++++++++------ 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/coherence/session.go b/coherence/session.go index 2d2886e..c4fa9ea 100644 --- a/coherence/session.go +++ b/coherence/session.go @@ -149,38 +149,44 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) ( // if no request timeout then use the env or default if session.sessOpts.RequestTimeout == time.Duration(0) { - timeoutString := getStringValueFromEnvVarOrDefault(envRequestTimeout, defaultRequestTimeout) - timeout, err := strconv.ParseInt(timeoutString, 10, 64) - if err != nil || timeout <= 0 { - return nil, fmt.Errorf("invalid value of %s for request timeout", timeoutString) + timeout, err := getTimeoutValue(envRequestTimeout, defaultRequestTimeout, "request timeout") + if err != nil { + return nil, err } - session.sessOpts.RequestTimeout = time.Duration(timeout) * time.Millisecond + session.sessOpts.RequestTimeout = timeout } // if no disconnect timeout then use the env or default if session.sessOpts.DisconnectTimeout == time.Duration(0) { - timeoutString := getStringValueFromEnvVarOrDefault(envDisconnectTimeout, defaultDisconnectTimeout) - timeout, err := strconv.ParseInt(timeoutString, 10, 64) - if err != nil || timeout <= 0 { - return nil, fmt.Errorf("invalid value of %s for disconnect timeout", timeoutString) + timeout, err := getTimeoutValue(envDisconnectTimeout, defaultDisconnectTimeout, "disconnect timeout") + if err != nil { + return nil, err } - session.sessOpts.DisconnectTimeout = time.Duration(timeout) * time.Millisecond + session.sessOpts.DisconnectTimeout = timeout } // if no ready timeout then use the env or default if session.sessOpts.ReadyTimeout == time.Duration(0) { - timeoutString := getStringValueFromEnvVarOrDefault(envReadyTimeout, defaultReadyTimeout) - timeout, err := strconv.ParseInt(timeoutString, 10, 64) - if err != nil || timeout < 0 { - return nil, fmt.Errorf("invalid value of %s for ready timeout", timeoutString) + timeout, err := getTimeoutValue(envReadyTimeout, defaultReadyTimeout, "ready timeout") + if err != nil { + return nil, err } - session.sessOpts.ReadyTimeout = time.Duration(timeout) * time.Millisecond + session.sessOpts.ReadyTimeout = timeout } // ensure initial connection return session, session.ensureConnection() } +func getTimeoutValue(envVar, defaultValue, description string) (time.Duration, error) { + timeoutString := getStringValueFromEnvVarOrDefault(envVar, defaultValue) + timeout, err := strconv.ParseInt(timeoutString, 10, 64) + if err != nil || timeout < 0 { + return 0, fmt.Errorf("invalid value of %s for %s", timeoutString, description) + } + return time.Duration(timeout) * time.Millisecond, nil +} + // WithAddress returns a function to set the address for session. func WithAddress(host string) func(sessionOptions *SessionOptions) { return func(s *SessionOptions) { diff --git a/test/e2e/standalone/event_test.go b/test/e2e/standalone/event_test.go index e288dd0..39cc362 100644 --- a/test/e2e/standalone/event_test.go +++ b/test/e2e/standalone/event_test.go @@ -110,10 +110,25 @@ func TestEventDisconnect(t *testing.T) { namedCache := GetNamedCache[string, string](g, session, "test-reconnect-cache") - RunTestReconnect(g, namedCache) + RunTestReconnect(g, namedCache, true) namedMap := GetNamedMap[string, string](g, session, "test-reconnect-map") - RunTestReconnect(g, namedMap) + RunTestReconnect(g, namedMap, true) +} + +// TestEventDisconnectWithReadyTimeoutDelay tests that the ready timeout is honoured +// and we can just issue a command and not sleep. +func TestEventDisconnectWithReadyTimeoutDelay(t *testing.T) { + t.Setenv("COHERENCE_SESSION_DEBUG", "true") + g, session := initTest(t, coherence.WithReadyTimeout(time.Duration(120000)*time.Millisecond)) + defer session.Close() + + namedCache := GetNamedCache[string, string](g, session, "test-reconnect-cache") + + RunTestReconnect(g, namedCache, false) + + namedMap := GetNamedMap[string, string](g, session, "test-reconnect-map") + RunTestReconnect(g, namedMap, false) } func TestMapEventInsertsOnly(t *testing.T) { @@ -221,8 +236,8 @@ func TestMapEventMultipleListeners(t *testing.T) { } // RunTestReconnect tests that a gRPC connection will reset it's self and the map listeners -// will re-register correctly. -func RunTestReconnect(g *gomega.WithT, namedMap coherence.NamedMap[string, string]) { +// will re-register correctly. If doSleep is true then a 60-second sleep is added, otherwise no sleep is included. +func RunTestReconnect(g *gomega.WithT, namedMap coherence.NamedMap[string, string], doSleep bool) { defer func(cache coherence.NamedMap[string, string], ctx context.Context) { err := cache.Destroy(ctx) if err != nil { @@ -251,8 +266,10 @@ func RunTestReconnect(g *gomega.WithT, namedMap coherence.NamedMap[string, strin _, err = IssuePostRequest("http://127.0.0.1:30000/management/coherence/cluster/services/$GRPC:GrpcProxy/members/1/stop") g.Expect(err).ShouldNot(gomega.HaveOccurred()) - // sleep for 60 seconds to give the shutdown time to take effect - Sleep(60) + if doSleep { + // sleep for 60 seconds to give the shutdown time to take effect + Sleep(60) + } // add another 'additional' mutations createMutations(g, namedMap, additional)