Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 164 additions & 39 deletions coherence/common.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions coherence/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ Refer to the section on [NewSession] for more information on setting up a SSL co

See [SessionOptions] which lists all the options supported by the [Session] API.

# Controlling connection timeouts

Most operations you call require you to supply a [context.Context]. If your context does not contain a deadline,
the operation will wrap your context in a new [context.WithTimeout] using either the default timeout of 30,000 millis or
the value you set using option [coherence.WithSessionTimeout] when you called [NewSession].

For example, to override the default timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following:

session, err = coherence.NewSession(ctx, coherence.WithSessionTimeout(time.Duration(5) * time.Second))

You can also override the default timeout using the environment variable COHERENCE_SESSION_TIMEOUT.

# Obtaining a NamedMap or NamedCache

Once a session has been created, the [GetNamedMap](session, name, ...options) or [GetNamedCache](session, name, ...options)
Expand Down
14 changes: 12 additions & 2 deletions coherence/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ func (it *streamedKeyIterator[K, V]) getNextPage() error {
return err
}

newCtx, cancel := it.bc.session.ensureContext(it.ctx)
if cancel != nil {
defer cancel()
}

request := &pb.PageRequest{Scope: it.bc.sessionOpts.Scope, Cache: it.bc.name, Format: it.bc.format, Cookie: it.cookie}

if client, err = it.bc.client.NextKeySetPage(it.ctx, request); err != nil {
if client, err = it.bc.client.NextKeySetPage(newCtx, request); err != nil {
return err
}

Expand Down Expand Up @@ -221,9 +226,14 @@ func (it *streamedEntryIterator[K, V]) getNextPage() error {
return err
}

newCtx, cancel := it.bc.session.ensureContext(it.ctx)
if cancel != nil {
defer cancel()
}

request := &pb.PageRequest{Scope: it.bc.sessionOpts.Scope, Cache: it.bc.name, Format: it.bc.format, Cookie: it.cookie}

if client, err = it.bc.client.NextEntrySetPage(it.ctx, request); err != nil {
if client, err = it.bc.client.NextEntrySetPage(newCtx, request); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions coherence/named_cache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func newNamedCache[K comparable, V any](session *Session, name string, sOpts *Se
unlocked = true
session.mutex.Unlock()

listener := newNamedCacheReconnectListener[K, V](session, *newCache)
listener := newNamedCacheReconnectListener[K, V](*newCache)
newCache.namedCacheReconnectListener = *listener

// unlock before adding reconnect listener
Expand All @@ -591,15 +591,15 @@ type namedCacheReconnectListener[K comparable, V any] struct {
}

// newReconnectSessionListener creates a new namedCacheReconnectListener.
func newNamedCacheReconnectListener[K comparable, V any](session *Session, nc NamedCacheClient[K, V]) *namedCacheReconnectListener[K, V] {
func newNamedCacheReconnectListener[K comparable, V any](nc NamedCacheClient[K, V]) *namedCacheReconnectListener[K, V] {
listener := namedCacheReconnectListener[K, V]{
listener: NewSessionLifecycleListener(),
}

listener.listener.OnReconnected(func(e SessionLifecycleEvent) {
// re-register listeners for the NamedCache
namedMap := convertNamedCacheClient[K, V](&nc)
if err := reRegisterListeners[K, V](session.sessionConnectCtx, &namedMap, &nc.baseClient); err != nil {
if err := reRegisterListeners[K, V](context.Background(), &namedMap, &nc.baseClient); err != nil {
log.Println(err)
}
})
Expand Down
8 changes: 3 additions & 5 deletions coherence/named_map_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,6 @@ func (nm *NamedMapClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.F
// log.Fatal(err)
// }
//
// iter := namedMap.KeySet(ctx)
//
// ch := namedMap.KeySet(ctx)
// for result := range ch {
// if result.Err != nil {
Expand Down Expand Up @@ -792,7 +790,7 @@ func newNamedMap[K comparable, V any](session *Session, name string, sOpts *Sess
unlocked = true
session.mutex.Unlock()

listener := newNamedMapReconnectListener[K, V](session, *newMap)
listener := newNamedMapReconnectListener[K, V](*newMap)
newMap.namedMapReconnectListener = *listener

// unlock before adding reconnect listener
Expand All @@ -808,15 +806,15 @@ type namedMapReconnectListener[K comparable, V any] struct {
}

// newReconnectSessionListener creates new namedMapReconnectListener.
func newNamedMapReconnectListener[K comparable, V any](session *Session, nm NamedMapClient[K, V]) *namedMapReconnectListener[K, V] {
func newNamedMapReconnectListener[K comparable, V any](nm NamedMapClient[K, V]) *namedMapReconnectListener[K, V] {
listener := namedMapReconnectListener[K, V]{
listener: NewSessionLifecycleListener(),
}

listener.listener.OnReconnected(func(e SessionLifecycleEvent) {
// re-register listeners for the NamedMap
namedMap := convertNamedMapClient[K, V](&nm)
if err := reRegisterListeners[K, V](session.sessionConnectCtx, &namedMap, &nm.baseClient); err != nil {
if err := reRegisterListeners[K, V](context.Background(), &namedMap, &nm.baseClient); err != nil {
log.Println(err)
}
})
Expand Down
65 changes: 53 additions & 12 deletions coherence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ import (
"google.golang.org/grpc/credentials/insecure"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
)

// ErrInvalidFormat indicates that the serialization format can only be JSON.
var ErrInvalidFormat = errors.New("format can only be 'json'")

const (
defaultFormat = "json"
mapOrCacheExists = "the %s %s already exists with different type parameters"
defaultFormat = "json"
mapOrCacheExists = "the %s %s already exists with different type parameters"
defaultSessionTimeout = "30000" // millis
)

// Session provides APIs to create NamedCaches. The NewSession() method creates a
// new instance of a Session. This method also takes a variable number of arguments, called options,
// Session provides APIs to create NamedCaches. The [NewSession] method creates a
// new instance of a [Session]. This method also takes a variable number of arguments, called options,
// that can be passed to configure the Session.
type Session struct {
sessionID uuid.UUID
Expand Down Expand Up @@ -60,6 +63,7 @@ type SessionOptions struct {
ClientKeyPath string
CaCertPath string
PlainText bool
Timeout time.Duration
}

// NewSession creates a new Session with the specified sessionOptions.
Expand Down Expand Up @@ -94,8 +98,8 @@ type SessionOptions struct {
// export COHERENCE_TLS_CERTS_PATH=/path/to/cert/to/be/added/for/trust
// export COHERENCE_IGNORE_INVALID_CERTS=true // option to ignore self-signed certificates - for testing only. Not to be used in production
//
// Finally, the Close() method can be used to close the Session. Once a Session is closed, no APIs
// on the NamedMap instances should be invoked. If invoked they all will return an error.
// Finally, the Close() method can be used to close the [Session]. Once a [Session] is closed, no APIs
// on the [NamedMap] instances should be invoked. If invoked they will return an error.
// [gRPC Naming]: https://github.com/grpc/grpc/blob/master/doc/naming.md
// [gRPC Proxy Server]: https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-remote-clients/using-coherence-grpc-server.html
func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (*Session, error) {
Expand All @@ -111,7 +115,8 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
lifecycleListeners: []*SessionLifecycleListener{},
sessOpts: &SessionOptions{
PlainText: false,
Format: defaultFormat},
Format: defaultFormat,
Timeout: time.Duration(0) * time.Second},
}

if getBoolValueFromEnvVarOrDefault(envSessionDebug, false) {
Expand All @@ -135,9 +140,18 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
session.sessOpts.Address = getStringValueFromEnvVarOrDefault(envHostName, "localhost:1408")
}

// if no timeout then use the env or default
if session.sessOpts.Timeout == time.Duration(0) {
timeoutString := getStringValueFromEnvVarOrDefault(envSessionTimeout, defaultSessionTimeout)
timeout, err := strconv.ParseInt(timeoutString, 10, 64)
if err != nil || timeout <= 0 {
return nil, fmt.Errorf("invalid value of %s for timeout", timeoutString)
}
session.sessOpts.Timeout = time.Duration(timeout) * time.Millisecond
}

// ensure initial connection
err := session.ensureConnection()
return session, err
return session, session.ensureConnection()
}

// WithAddress returns a function to set the address for session.
Expand Down Expand Up @@ -169,6 +183,13 @@ func WithPlainText() func(sessionOptions *SessionOptions) {
}
}

// WithSessionTimeout returns a function to set the session timeout.
func WithSessionTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) {
return func(s *SessionOptions) {
s.Timeout = timeout
}
}

// ID returns the identifier of a session.
func (s *Session) ID() string {
return s.sessionID.String()
Expand All @@ -190,6 +211,11 @@ func (s *Session) String() string {
len(s.caches), len(s.maps), s.sessOpts)
}

// GetSessionTimeout returns the session timeout in seconds.
func (s *Session) GetSessionTimeout() time.Duration {
return s.sessOpts.Timeout
}

// ensureConnection ensures a session has a valid connection
func (s *Session) ensureConnection() error {
if s.firstConnectAttempted {
Expand Down Expand Up @@ -222,7 +248,12 @@ func (s *Session) ensureConnection() error {
s.mutex.Lock()
locked = true

conn, err := grpc.DialContext(s.sessionConnectCtx, s.sessOpts.Address, s.dialOptions...)
newCtx, cancel := s.ensureContext(s.sessionConnectCtx)
if cancel != nil {
defer cancel()
}

conn, err := grpc.DialContext(newCtx, s.sessOpts.Address, s.dialOptions...)

if err != nil {
log.Printf("could not connect. Reason: %v", err)
Expand Down Expand Up @@ -469,8 +500,8 @@ func validateFilePath(file string) error {
// String returns a string representation of SessionOptions.
func (s *SessionOptions) String() string {
var sb = strings.Builder{}
sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v,",
s.Address, s.TLSEnabled, s.Scope, s.Format))
sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, timeout=%v",
s.Address, s.TLSEnabled, s.Scope, s.Format, s.Timeout))

if s.TLSEnabled {
sb.WriteString(fmt.Sprintf(" clientCertPath=%v, clientKeyPath=%v, caCertPath=%v,",
Expand All @@ -490,3 +521,13 @@ func (s *Session) dispatch(eventType SessionLifecycleEventType,
}
}
}

// ensureContext will ensure that the context has deadline and if not will apply the timeout from the
// [SessionOptions].
func (s *Session) ensureContext(ctx context.Context) (context.Context, context.CancelFunc) {
if _, ok := ctx.Deadline(); !ok {
// no deadline set, so wrap the context in a Timeout
return context.WithTimeout(ctx, s.sessOpts.Timeout)
}
return ctx, nil
}
13 changes: 13 additions & 0 deletions coherence/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ package coherence
import (
"context"
"github.com/onsi/gomega"
"strconv"
"testing"
"time"
)

func TestSessionValidation(t *testing.T) {
Expand All @@ -21,4 +23,15 @@ func TestSessionValidation(t *testing.T) {

_, err = NewSession(ctx, WithFormat("not-json"))
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))

// test default timeout
timeout, _ := strconv.ParseInt(defaultSessionTimeout, 10, 64)
s, err := NewSession(ctx)
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond))

// test setting a timeout
s, err = NewSession(ctx, WithSessionTimeout(time.Duration(33)*time.Millisecond))
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(33) * time.Millisecond))
}
47 changes: 47 additions & 0 deletions test/e2e/standalone/named_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,53 @@ func TestBasicCrudOperationsVariousTypes(t *testing.T) {
map[int]string{1: "one", 2: "two", 3: "three"})
}

func TestSessionWithSpecifiedTimeout(t *testing.T) {
var (
g = gomega.NewWithT(t)
err error
session *coherence.Session
)

session, err = GetSession(coherence.WithSessionTimeout(time.Duration(10) * time.Second))
g.Expect(err).ShouldNot(gomega.HaveOccurred())
defer session.Close()

runTimeoutTest(g, session)
}

func TestSessionWithEnvTimeout(t *testing.T) {
var (
g = gomega.NewWithT(t)
err error
session *coherence.Session
)

t.Setenv("COHERENCE_SESSION_TIMEOUT", "10000")

session, err = GetSession()
g.Expect(err).ShouldNot(gomega.HaveOccurred())
defer session.Close()

runTimeoutTest(g, session)
}

func runTimeoutTest(g *gomega.WithT, session *coherence.Session) {
// we should get an error as we should be > default timeout
namedMap := getNewNamedMap[int, string](g, session, "timeout-map")
err := namedMap.Clear(ctx)
g.Expect(err).ShouldNot(gomega.HaveOccurred())

namedCache := getNewNamedCache[int, string](g, session, "timeout-cache")
err = namedCache.Clear(ctx)
g.Expect(err).ShouldNot(gomega.HaveOccurred())

// create a new context with an existing deadline, it should be honored
ctxNew, cancel := context.WithTimeout(ctx, time.Duration(1)*time.Nanosecond)
defer cancel()
err = namedCache.Clear(ctxNew)
g.Expect(err).Should(gomega.HaveOccurred())
}

// TestBasicCrudOperationsVariousTypesWithStructKey tests operations against caches that have keys and values as structs.
func TestBasicCrudOperationsVariousTypesWithStructKey(t *testing.T) {
var (
Expand Down