Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# ----------------------------------------------------------------------------------------------------------------------

# This is the version of the coherence-go-client
VERSION ?=1.0.0-rc2
VERSION ?=1.0.0-rc4
CURRDIR := $(shell pwd)
USER_ID := $(shell echo "`id -u`:`id -g`")

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ For local development, we recommend using the Coherence CE Docker image; it cont
everything necessary for the client to operate correctly.

```bash
docker run -d -p 1408:1408 ghcr.io/oracle/coherence-ce:22.06.4
docker run -d -p 1408:1408 -p 30000:30000 ghcr.io/oracle/coherence-ce:22.06.4
```

## Installation
Expand Down
4 changes: 3 additions & 1 deletion coherence/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ const (
envTLSClientCert = "COHERENCE_TLS_CLIENT_CERT"
envTLSClientKey = "COHERENCE_TLS_CLIENT_KEY"
envIgnoreInvalidCerts = "COHERENCE_IGNORE_INVALID_CERTS"
envSessionTimeout = "COHERENCE_SESSION_TIMEOUT"
envRequestTimeout = "COHERENCE_CLIENT_REQUEST_TIMEOUT"
envDisconnectTimeout = "COHERENCE_SESSION_DISCONNECT_TIMEOUT"
envReadyTimeout = "COHERENCE_READY_TIMEOUT"

// envSessionDebug enabled session debug messages to be displayed.
envSessionDebug = "COHERENCE_SESSION_DEBUG"
Expand Down
19 changes: 14 additions & 5 deletions coherence/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,26 @@ Refer to the section on [NewSession] for more information on setting up a SSL co

See [SessionOptions] which lists all the options supported by the [Session] API.

# Controlling connection timeouts
# Controlling timeouts

Most operations you call require you to supply a [context.Context]. If your context does not contain a deadline,
the operation will wrap your context in a new [context.WithTimeout] using either the default timeout of 30,000 millis or
the value you set using option [coherence.WithSessionTimeout] when you called [NewSession].
the value you set using option [coherence.WithRequestTimeout] when you called [NewSession].

For example, to override the default timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following:
For example, to override the default request timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following:

session, err = coherence.NewSession(ctx, coherence.WithSessionTimeout(time.Duration(5) * time.Second))
session, err = coherence.NewSession(ctx, coherence.WithRequestTimeout(time.Duration(5) * time.Second))

You can also override the default timeout using the environment variable COHERENCE_SESSION_TIMEOUT.
You can also override the default request timeout using the environment variable COHERENCE_CLIENT_REQUEST_TIMEOUT.

By default, if an endpoint is not ready, the Go client will fail-fast. You can change this behaviour by setting
the option [coherence.WithReadyTimeout] to a value millis value greater than zero which will cause the Go client
to wait until up to the timeout specified until it fails if no endpoint is available. You can also use the environment variable
COHERENCE_READY_TIMEOUT.

You also have the ability to control maximum amount of time, in milliseconds, a [Session] may remain in a disconnected state
without successfully reconnecting. For this you use the option [coherence.WithDisconnectTimeout] or the environment
variable COHERENCE_SESSION_DISCONNECT_TIMEOUT.

# Obtaining a NamedMap or NamedCache

Expand Down
174 changes: 135 additions & 39 deletions coherence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ import (
var ErrInvalidFormat = errors.New("format can only be 'json'")

const (
defaultFormat = "json"
mapOrCacheExists = "the %s %s already exists with different type parameters"
defaultSessionTimeout = "30000" // millis
defaultFormat = "json"
mapOrCacheExists = "the %s %s already exists with different type parameters"
defaultRequestTimeout = "30000" // millis
defaultDisconnectTimeout = "30000" // millis
defaultReadyTimeout = "0" // millis
)

// Session provides APIs to create NamedCaches. The [NewSession] method creates a
Expand All @@ -56,15 +58,17 @@ type Session struct {

// SessionOptions holds the session attributes like host, port, tls attributes etc.
type SessionOptions struct {
Address string
TLSEnabled bool
Scope string
Format string
ClientCertPath string
ClientKeyPath string
CaCertPath string
PlainText bool
Timeout time.Duration
Address string
TLSEnabled bool
Scope string
Format string
ClientCertPath string
ClientKeyPath string
CaCertPath string
PlainText bool
RequestTimeout time.Duration
DisconnectTimeout time.Duration
ReadyTimeout time.Duration
}

// NewSession creates a new Session with the specified sessionOptions.
Expand Down Expand Up @@ -115,9 +119,11 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
caches: make(map[string]interface{}, 0),
lifecycleListeners: []*SessionLifecycleListener{},
sessOpts: &SessionOptions{
PlainText: false,
Format: defaultFormat,
Timeout: time.Duration(0) * time.Second},
PlainText: false,
Format: defaultFormat,
RequestTimeout: time.Duration(0) * time.Second,
ReadyTimeout: time.Duration(0) * time.Second,
DisconnectTimeout: time.Duration(0) * time.Second},
}

if getBoolValueFromEnvVarOrDefault(envSessionDebug, false) {
Expand All @@ -141,20 +147,46 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
session.sessOpts.Address = getStringValueFromEnvVarOrDefault(envHostName, "localhost:1408")
}

// if no timeout then use the env or default
if session.sessOpts.Timeout == time.Duration(0) {
timeoutString := getStringValueFromEnvVarOrDefault(envSessionTimeout, defaultSessionTimeout)
timeout, err := strconv.ParseInt(timeoutString, 10, 64)
if err != nil || timeout <= 0 {
return nil, fmt.Errorf("invalid value of %s for timeout", timeoutString)
// if no request timeout then use the env or default
if session.sessOpts.RequestTimeout == time.Duration(0) {
timeout, err := getTimeoutValue(envRequestTimeout, defaultRequestTimeout, "request timeout")
if err != nil {
return nil, err
}
session.sessOpts.Timeout = time.Duration(timeout) * time.Millisecond
session.sessOpts.RequestTimeout = timeout
}

// if no disconnect timeout then use the env or default
if session.sessOpts.DisconnectTimeout == time.Duration(0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be refactored to avoid duplication?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

timeout, err := getTimeoutValue(envDisconnectTimeout, defaultDisconnectTimeout, "disconnect timeout")
if err != nil {
return nil, err
}
session.sessOpts.DisconnectTimeout = timeout
}

// if no ready timeout then use the env or default
if session.sessOpts.ReadyTimeout == time.Duration(0) {
timeout, err := getTimeoutValue(envReadyTimeout, defaultReadyTimeout, "ready timeout")
if err != nil {
return nil, err
}
session.sessOpts.ReadyTimeout = timeout
}

// ensure initial connection
return session, session.ensureConnection()
}

func getTimeoutValue(envVar, defaultValue, description string) (time.Duration, error) {
timeoutString := getStringValueFromEnvVarOrDefault(envVar, defaultValue)
timeout, err := strconv.ParseInt(timeoutString, 10, 64)
if err != nil || timeout < 0 {
return 0, fmt.Errorf("invalid value of %s for %s", timeoutString, description)
}
return time.Duration(timeout) * time.Millisecond, nil
}

// WithAddress returns a function to set the address for session.
func WithAddress(host string) func(sessionOptions *SessionOptions) {
return func(s *SessionOptions) {
Expand Down Expand Up @@ -184,10 +216,27 @@ func WithPlainText() func(sessionOptions *SessionOptions) {
}
}

// WithSessionTimeout returns a function to set the session timeout.
func WithSessionTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) {
// WithRequestTimeout returns a function to set the request timeout in millis.
func WithRequestTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) {
return func(s *SessionOptions) {
s.RequestTimeout = timeout
}
}

// WithDisconnectTimeout returns a function to set the maximum amount of time, in millis, a [Session]
// may remain in a disconnected state without successfully reconnecting.
func WithDisconnectTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) {
return func(s *SessionOptions) {
s.Timeout = timeout
s.DisconnectTimeout = timeout
}
}

// WithReadyTimeout returns a function to set the maximum amount of time an [NamedMap] or [NamedCache]
// operations may wait for the underlying gRPC channel to be ready. This is independent of the request
// timeout which sets a deadline on how long the call may take after being dispatched.
func WithReadyTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) {
return func(s *SessionOptions) {
s.ReadyTimeout = timeout
}
}

Expand Down Expand Up @@ -217,16 +266,30 @@ func (s *Session) String() string {
len(s.caches), len(s.maps), s.sessOpts)
}

// GetSessionTimeout returns the session timeout in seconds.
func (s *Session) GetSessionTimeout() time.Duration {
return s.sessOpts.Timeout
// GetRequestTimeout returns the session timeout in millis.
func (s *Session) GetRequestTimeout() time.Duration {
return s.sessOpts.RequestTimeout
}

// GetDisconnectTimeout returns the session disconnect timeout in millis.
func (s *Session) GetDisconnectTimeout() time.Duration {
return s.sessOpts.DisconnectTimeout
}

// GetReadyTimeout returns the session disconnect timeout in millis.
func (s *Session) GetReadyTimeout() time.Duration {
return s.sessOpts.ReadyTimeout
}

// ensureConnection ensures a session has a valid connection
func (s *Session) ensureConnection() error {
if s.firstConnectAttempted {
// We have previously tried to connect so check that the connect state is connected
if s.conn.GetState() != connectivity.Ready {
// if the readyTime is set, and we are not connected then block and wait for connection
if s.GetReadyTimeout() != 0 {
return waitForReady(s)
}
s.debug(fmt.Sprintf("session: %s attempting connection to address %s", s.sessionID, s.sessOpts.Address))
s.conn.Connect()
return nil
Expand Down Expand Up @@ -285,11 +348,11 @@ func (s *Session) ensureConnection() error {
// refer: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
go func(session *Session) {
var (
firstConnect = true
connected = false
ctx = context.Background()
lastState = session.conn.GetState()
disconnectTime int64 = 0
firstConnect = true
connected = false
ctx = context.Background()
lastState = session.conn.GetState()
disconnectTime int64
)

for {
Expand Down Expand Up @@ -341,8 +404,9 @@ func (s *Session) ensureConnection() error {
disconnectTime = time.Now().UnixMilli()
} else {
waited := time.Now().UnixMilli() - disconnectTime
if waited >= session.GetSessionTimeout().Milliseconds() {
log.Printf("session: %s unable to reconnect within [%s]. Closing session.", session.sessionID, session.GetSessionTimeout().String())
if waited >= session.GetDisconnectTimeout().Milliseconds() {
log.Printf("session: %s unable to reconnect within disconnect timeout of [%s]. Closing session.",
session.sessionID, session.GetDisconnectTimeout().String())
session.Close()
return
}
Expand All @@ -361,6 +425,38 @@ func (s *Session) ensureConnection() error {
return nil
}

// waitForReady waits until the connection is ready up to the ready session timeout and will
// return nil if the session was connected, otherwise an error is returned.
// We intentionally do no use the gRPC WaitForReady as this can cause a race condition in the session
// events code.
func waitForReady(s *Session) error {
var (
readyTimeout = s.GetReadyTimeout()
messageLogged = false
)

// try to connect up until timeout, then throw err if not available
timeout := time.Now().Add(readyTimeout)
for {
if time.Now().After(timeout) {
return fmt.Errorf("unable to connect to %s after ready timeout of %v", s.sessOpts.Address, readyTimeout)
}

s.conn.Connect()

time.Sleep(time.Duration(250) * time.Millisecond)
state := s.conn.GetState()

if state == connectivity.Ready {
return nil
}
if !messageLogged {
log.Printf("State is %v, waiting until ready timeout of %v for valid connection", state, readyTimeout)
messageLogged = true
}
}
}

// GetOptions returns the options that were passed during this session creation.
func (s *Session) GetOptions() *SessionOptions {
return s.sessOpts
Expand Down Expand Up @@ -530,8 +626,8 @@ func validateFilePath(file string) error {
// String returns a string representation of SessionOptions.
func (s *SessionOptions) String() string {
var sb = strings.Builder{}
sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, timeout=%v",
s.Address, s.TLSEnabled, s.Scope, s.Format, s.Timeout))
sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, request timeout=%v, disconnect timeout=%v, ready timeout=%v",
s.Address, s.TLSEnabled, s.Scope, s.Format, s.RequestTimeout, s.DisconnectTimeout, s.ReadyTimeout))

if s.TLSEnabled {
sb.WriteString(fmt.Sprintf(" clientCertPath=%v, clientKeyPath=%v, caCertPath=%v,",
Expand All @@ -556,8 +652,8 @@ func (s *Session) dispatch(eventType SessionLifecycleEventType,
// [SessionOptions].
func (s *Session) ensureContext(ctx context.Context) (context.Context, context.CancelFunc) {
if _, ok := ctx.Deadline(); !ok {
// no deadline set, so wrap the context in a Timeout
return context.WithTimeout(ctx, s.sessOpts.Timeout)
// no deadline set, so wrap the context in a RequestTimeout
return context.WithTimeout(ctx, s.sessOpts.RequestTimeout)
}
return ctx, nil
}
48 changes: 42 additions & 6 deletions coherence/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,51 @@ func TestSessionValidation(t *testing.T) {
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))

// test default timeout
timeout, _ := strconv.ParseInt(defaultSessionTimeout, 10, 64)
timeout, _ := strconv.ParseInt(defaultRequestTimeout, 10, 64)
s, err := NewSession(ctx)
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond))
g.Expect(s.sessOpts.RequestTimeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond))

// test setting a timeout
s, err = NewSession(ctx, WithSessionTimeout(time.Duration(33)*time.Millisecond))
// test setting a request timeout
s, err = NewSession(ctx, WithRequestTimeout(time.Duration(33)*time.Millisecond))
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(33) * time.Millisecond))
g.Expect(s.sessOpts.RequestTimeout).To(gomega.Equal(time.Duration(33) * time.Millisecond))

// test setting a disconnected timeout
s, err = NewSession(ctx, WithDisconnectTimeout(time.Duration(34)*time.Millisecond))
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.DisconnectTimeout).To(gomega.Equal(time.Duration(34) * time.Millisecond))

// test setting a ready timeout
s, err = NewSession(ctx, WithReadyTimeout(time.Duration(35)*time.Millisecond))
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.ReadyTimeout).To(gomega.Equal(time.Duration(35) * time.Millisecond))
}

func TestSessionEnvValidation(t *testing.T) {
var (
g = gomega.NewWithT(t)
err error
ctx = context.Background()
)

// test default timeout
t.Setenv("COHERENCE_CLIENT_REQUEST_TIMEOUT", "5000")
s, err := NewSession(ctx)
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.RequestTimeout).To(gomega.Equal(time.Duration(5000) * time.Millisecond))

// test setting a disconnected timeout
t.Setenv("COHERENCE_SESSION_DISCONNECT_TIMEOUT", "6000")
s, err = NewSession(ctx)
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.DisconnectTimeout).To(gomega.Equal(time.Duration(6000) * time.Millisecond))

// test setting a ready timeout
t.Setenv("COHERENCE_READY_TIMEOUT", "7000")
s, err = NewSession(ctx)
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.ReadyTimeout).To(gomega.Equal(time.Duration(7000) * time.Millisecond))
}

func TestSessionEnvDebug(t *testing.T) {
Expand Down Expand Up @@ -72,7 +108,7 @@ func TestSessionTimeout(t *testing.T) {
ctx = context.Background()
)

t.Setenv(envSessionTimeout, "-1")
t.Setenv(envRequestTimeout, "-1")
_, err := NewSession(ctx)
g.Expect(err).To(gomega.HaveOccurred())
}
Loading