diff --git a/coherence/common.go b/coherence/common.go index 3f75299..3d5b595 100644 --- a/coherence/common.go +++ b/coherence/common.go @@ -27,6 +27,7 @@ const ( envTLSClientCert = "COHERENCE_TLS_CLIENT_CERT" envTLSClientKey = "COHERENCE_TLS_CLIENT_KEY" envIgnoreInvalidCerts = "COHERENCE_IGNORE_INVALID_CERTS" + envSessionTimeout = "COHERENCE_SESSION_TIMEOUT" // envSessionDebug enabled session debug messages to be displayed. envSessionDebug = "COHERENCE_SESSION_DEBUG" @@ -81,9 +82,14 @@ func executeClear[K comparable, V any](ctx context.Context, bc *baseClient[K, V] return err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + clearRequest := pb.ClearRequest{Cache: bc.name, Scope: bc.sessionOpts.Scope} - _, err = bc.client.Clear(ctx, &clearRequest) + _, err = bc.client.Clear(newCtx, &clearRequest) return err } @@ -116,6 +122,11 @@ func executeAddIndex[K comparable, V, T, E any](ctx context.Context, bc *baseCli return err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binExtractor, err = extractorSerializer.Serialize(extractor) if err != nil { return err @@ -128,7 +139,7 @@ func executeAddIndex[K comparable, V, T, E any](ctx context.Context, bc *baseCli addIndexRequest := pb.AddIndexRequest{ Cache: bc.name, Scope: bc.sessionOpts.Scope, Format: bc.format, Extractor: binExtractor, Sorted: sorted, Comparator: binComparator} - _, err = bc.client.AddIndex(ctx, &addIndexRequest) + _, err = bc.client.AddIndex(newCtx, &addIndexRequest) return err } @@ -144,6 +155,11 @@ func executeRemoveIndex[K comparable, V, T, E any](ctx context.Context, bc *base return err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binExtractor, err = extractorSerializer.Serialize(extractor) if err != nil { return err @@ -152,7 +168,7 @@ func executeRemoveIndex[K comparable, V, T, E any](ctx context.Context, bc *base removeIndexRequest := pb.RemoveIndexRequest{ Cache: bc.name, Scope: bc.sessionOpts.Scope, Format: bc.format, Extractor: binExtractor} - _, err = bc.client.RemoveIndex(ctx, &removeIndexRequest) + _, err = bc.client.RemoveIndex(newCtx, &removeIndexRequest) return err } @@ -163,9 +179,14 @@ func executeTruncate[K comparable, V any](ctx context.Context, bc *baseClient[K, return err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + request := pb.TruncateRequest{Cache: bc.name, Scope: bc.sessionOpts.Scope} - _, err = bc.client.Truncate(ctx, &request) + _, err = bc.client.Truncate(newCtx, &request) return err } @@ -176,9 +197,14 @@ func executeDestroy[K comparable, V any](ctx context.Context, bc *baseClient[K, return err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + request := pb.DestroyRequest{Cache: bc.name, Scope: bc.sessionOpts.Scope} - _, err = bc.client.Destroy(ctx, &request) + _, err = bc.client.Destroy(newCtx, &request) if err != nil { return err } @@ -220,6 +246,11 @@ func executeContainsKey[K comparable, V any](ctx context.Context, bc *baseClient return false, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return false, err @@ -227,7 +258,7 @@ func executeContainsKey[K comparable, V any](ctx context.Context, bc *baseClient containsKeyRequest := pb.ContainsKeyRequest{Cache: bc.name, Key: binKey, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err = bc.client.ContainsKey(ctx, &containsKeyRequest) + result, err = bc.client.ContainsKey(newCtx, &containsKeyRequest) if err != nil { return false, err } @@ -245,6 +276,11 @@ func executeContainsValue[K comparable, V any](ctx context.Context, bc *baseClie return false, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binValue, err = bc.valueSerializer.Serialize(value) if err != nil { return false, err @@ -252,7 +288,7 @@ func executeContainsValue[K comparable, V any](ctx context.Context, bc *baseClie containsValueRequest := pb.ContainsValueRequest{Cache: bc.name, Value: binValue, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err = bc.client.ContainsValue(ctx, &containsValueRequest) + result, err = bc.client.ContainsValue(newCtx, &containsValueRequest) if err != nil { return false, err } @@ -271,6 +307,11 @@ func executeContainsEntry[K comparable, V any](ctx context.Context, bc *baseClie return false, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return false, err @@ -283,7 +324,7 @@ func executeContainsEntry[K comparable, V any](ctx context.Context, bc *baseClie containsEntryRequest := pb.ContainsEntryRequest{Cache: bc.name, Key: binKey, Value: binValue, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err = bc.client.ContainsEntry(ctx, &containsEntryRequest) + result, err = bc.client.ContainsEntry(newCtx, &containsEntryRequest) if err != nil { return false, err } @@ -300,9 +341,14 @@ func executeIsEmpty[K comparable, V any](ctx context.Context, bc *baseClient[K, return false, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + emptyRequest := pb.IsEmptyRequest{Cache: bc.name} - result, err = bc.client.IsEmpty(ctx, &emptyRequest) + result, err = bc.client.IsEmpty(newCtx, &emptyRequest) if err != nil { return false, err } @@ -321,6 +367,11 @@ func executeGet[K comparable, V any](ctx context.Context, bc *baseClient[K, V], return zeroValue, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { @@ -329,7 +380,7 @@ func executeGet[K comparable, V any](ctx context.Context, bc *baseClient[K, V], getRequest := pb.GetRequest{Key: binKey, Cache: bc.name, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err := bc.client.Get(ctx, &getRequest) + result, err := bc.client.Get(newCtx, &getRequest) if err != nil { return zeroValue, err } @@ -354,6 +405,8 @@ func executeGetAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V return ch } + newCtx, cancel := bc.session.ensureContext(ctx) + // serialize the array of keys binKeys, err = serializeKeys[K](bc.keySerializer, keys) if err != nil { @@ -362,6 +415,10 @@ func executeGetAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V } go func() { + if cancel != nil { + defer cancel() + } + var ( request = pb.GetAllRequest{Cache: bc.name, Key: binKeys, Format: bc.format, Scope: bc.sessionOpts.Scope} @@ -369,7 +426,7 @@ func executeGetAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V value *V ) - getAllClient, err1 := bc.client.GetAll(ctx, &request) + getAllClient, err1 := bc.client.GetAll(newCtx, &request) if err1 != nil { ch <- &StreamedEntry[K, V]{Err: err1} close(ch) @@ -377,9 +434,7 @@ func executeGetAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V } for { - var ( - response = new(pb.Entry) - ) + var response = new(pb.Entry) err1 = getAllClient.RecvMsg(response) if err1 == io.EOF { @@ -452,6 +507,11 @@ func executeAggregate[K comparable, V, R any](ctx context.Context, bc *baseClien return zeroValue, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + aggregatorSerializer := NewSerializer[any](bc.format) binAggregator, err = aggregatorSerializer.Serialize(aggr) if err != nil { @@ -481,7 +541,7 @@ func executeAggregate[K comparable, V, R any](ctx context.Context, bc *baseClien Format: bc.format, Scope: bc.sessionOpts.Scope, Aggregator: binAggregator, Filter: binFilter} - result, err = bc.client.Aggregate(ctx, &request) + result, err = bc.client.Aggregate(newCtx, &request) if err != nil { return zeroValue, err } @@ -503,8 +563,9 @@ func executeInvoke[K comparable, V any, R any](ctx context.Context, bc *baseClie return zeroValue, err } - if err != nil { - return zeroValue, err + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() } binKey, err = bc.keySerializer.Serialize(key) @@ -521,7 +582,7 @@ func executeInvoke[K comparable, V any, R any](ctx context.Context, bc *baseClie request := pb.InvokeRequest{Key: binKey, Cache: bc.name, Format: bc.format, Scope: bc.sessionOpts.Scope, Processor: binProcessor} - result, err = bc.client.Invoke(ctx, &request) + result, err = bc.client.Invoke(newCtx, &request) if err != nil { return zeroValue, err } @@ -544,6 +605,8 @@ func executeInvokeAllFilterOrKeys[K comparable, V any, R any](ctx context.Contex return ch } + newCtx, cancel := bc.session.ensureContext(ctx) + procSerializer := NewSerializer[any](bc.format) if binProcessor, err = procSerializer.Serialize(proc); err != nil { ch <- &StreamedValue[R]{Err: err} @@ -565,9 +628,12 @@ func executeInvokeAllFilterOrKeys[K comparable, V any, R any](ctx context.Contex } go func() { + if cancel != nil { + defer cancel() + } request := pb.InvokeAllRequest{Cache: bc.name, Filter: binFilter, Keys: binKeys, Processor: binProcessor, Format: bc.format, Scope: bc.sessionOpts.Scope} - valuesClient, err1 := bc.client.InvokeAll(ctx, &request) + valuesClient, err1 := bc.client.InvokeAll(newCtx, &request) resultSerializer := NewSerializer[R](bc.format) if err1 != nil { @@ -652,6 +718,8 @@ func executeKeySetFilter[K comparable, V any](ctx context.Context, bc *baseClien return ch } + newCtx, cancel := bc.session.ensureContext(ctx) + if fltr == nil { fltr = filters.Always() } @@ -662,9 +730,13 @@ func executeKeySetFilter[K comparable, V any](ctx context.Context, bc *baseClien } go func() { + if cancel != nil { + defer cancel() + } + request := pb.KeySetRequest{Cache: bc.name, Filter: binFilter, Format: bc.format, Scope: bc.sessionOpts.Scope} - valuesClient, err1 := bc.client.KeySet(ctx, &request) + valuesClient, err1 := bc.client.KeySet(newCtx, &request) if err1 != nil { ch <- &StreamedKey[K]{Err: err1} @@ -714,8 +786,13 @@ func executePutAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V return err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + e := make([]*pb.Entry, len(entries)) - counter := 0 + counterPutAll := 0 for k, v := range entries { binKey, err = bc.keySerializer.Serialize(k) if err != nil { @@ -726,13 +803,13 @@ func executePutAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V if err != nil { return err } - e[counter] = &pb.Entry{Key: binKey, Value: binValue} - counter++ + e[counterPutAll] = &pb.Entry{Key: binKey, Value: binValue} + counterPutAll++ } putAllRequest := pb.PutAllRequest{Entry: e, Cache: bc.name, Format: bc.format, Scope: bc.sessionOpts.Scope} - _, err = bc.client.PutAll(ctx, &putAllRequest) + _, err = bc.client.PutAll(newCtx, &putAllRequest) if err != nil { return err } @@ -754,6 +831,11 @@ func executePutIfAbsent[K comparable, V any](ctx context.Context, bc *baseClient return zeroValue, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return zeroValue, err @@ -767,7 +849,7 @@ func executePutIfAbsent[K comparable, V any](ctx context.Context, bc *baseClient putIfAbsentRequest := pb.PutIfAbsentRequest{Key: binKey, Value: binValue, Cache: bc.name, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err = bc.client.PutIfAbsent(ctx, &putIfAbsentRequest) + result, err = bc.client.PutIfAbsent(newCtx, &putIfAbsentRequest) if err != nil { return zeroValue, err } @@ -789,6 +871,11 @@ func executePutWithExpiry[K comparable, V any](ctx context.Context, bc *baseClie return zeroValue, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return zeroValue, err @@ -802,7 +889,7 @@ func executePutWithExpiry[K comparable, V any](ctx context.Context, bc *baseClie putRequest := pb.PutRequest{Key: binKey, Value: binValue, Cache: bc.name, Format: bc.format, Ttl: ttl.Milliseconds(), Scope: bc.sessionOpts.Scope} - result, err = bc.client.Put(ctx, &putRequest) + result, err = bc.client.Put(newCtx, &putRequest) if err != nil { return zeroValue, err } @@ -822,6 +909,11 @@ func executeRemove[K comparable, V any](ctx context.Context, bc *baseClient[K, V return zeroValue, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return zeroValue, err @@ -829,7 +921,7 @@ func executeRemove[K comparable, V any](ctx context.Context, bc *baseClient[K, V removeRequest := pb.RemoveRequest{Key: binKey, Cache: bc.name, Format: bc.format, Scope: bc.sessionOpts.Scope} - oldValue, err = bc.client.Remove(ctx, &removeRequest) + oldValue, err = bc.client.Remove(newCtx, &removeRequest) if err != nil { return zeroValue, err } @@ -850,6 +942,11 @@ func executeRemoveMapping[K comparable, V any](ctx context.Context, bc *baseClie return false, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return false, err @@ -862,7 +959,7 @@ func executeRemoveMapping[K comparable, V any](ctx context.Context, bc *baseClie request := pb.RemoveMappingRequest{Cache: bc.name, Key: binKey, Value: binValue, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err = bc.client.RemoveMapping(ctx, &request) + result, err = bc.client.RemoveMapping(newCtx, &request) if err != nil { return false, err } @@ -882,6 +979,11 @@ func executeReplace[K comparable, V any](ctx context.Context, bc *baseClient[K, return zeroValue, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return zeroValue, err @@ -894,7 +996,7 @@ func executeReplace[K comparable, V any](ctx context.Context, bc *baseClient[K, request := pb.ReplaceRequest{Key: binKey, Value: binValue, Cache: bc.name, Format: bc.format, Scope: bc.sessionOpts.Scope} - oldValue, err = bc.client.Replace(ctx, &request) + oldValue, err = bc.client.Replace(newCtx, &request) if err != nil { return zeroValue, err } @@ -916,6 +1018,11 @@ func executeReplaceMapping[K comparable, V any](ctx context.Context, bc *baseCli return false, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + binKey, err = bc.keySerializer.Serialize(key) if err != nil { return false, err @@ -934,7 +1041,7 @@ func executeReplaceMapping[K comparable, V any](ctx context.Context, bc *baseCli request := pb.ReplaceMappingRequest{Cache: bc.name, Key: binKey, PreviousValue: binPrevValue, NewValue: binNewValue, Format: bc.format, Scope: bc.sessionOpts.Scope} - result, err = bc.client.ReplaceMapping(ctx, &request) + result, err = bc.client.ReplaceMapping(newCtx, &request) if err != nil { return false, err } @@ -948,9 +1055,14 @@ func executeSize[K comparable, V any](ctx context.Context, bc *baseClient[K, V]) return 0, err } + newCtx, cancel := bc.session.ensureContext(ctx) + if cancel != nil { + defer cancel() + } + sizeRequest := pb.SizeRequest{Cache: bc.name} - size, err := bc.client.Size(ctx, &sizeRequest) + size, err := bc.client.Size(newCtx, &sizeRequest) if err != nil { return 0, err } @@ -960,9 +1072,8 @@ func executeSize[K comparable, V any](ctx context.Context, bc *baseClient[K, V]) // executeEntrySet executes the KeySet operation against a baseClient. func executeEntrySet[K comparable, V any](ctx context.Context, bc *baseClient[K, V]) <-chan *StreamedEntry[K, V] { var ( - err = bc.ensureClientConnection() - ch = make(chan *StreamedEntry[K, V]) - iter = newEntryPageIterator[K, V](ctx, bc) + err = bc.ensureClientConnection() + ch = make(chan *StreamedEntry[K, V]) ) if err != nil { @@ -970,6 +1081,8 @@ func executeEntrySet[K comparable, V any](ctx context.Context, bc *baseClient[K, return ch } + iter := newEntryPageIterator[K, V](ctx, bc) + go func() { for { result, err1 := iter.Next() @@ -1001,6 +1114,8 @@ func executeEntrySetFilter[K comparable, V any](ctx context.Context, bc *baseCli return ch } + newCtx, cancel := bc.session.ensureContext(ctx) + if fltr == nil { fltr = filters.Always() } @@ -1011,6 +1126,10 @@ func executeEntrySetFilter[K comparable, V any](ctx context.Context, bc *baseCli } go func() { + if cancel != nil { + defer cancel() + } + var ( request = pb.EntrySetRequest{Cache: bc.name, Filter: binFilter, Format: bc.format, Scope: bc.sessionOpts.Scope} @@ -1018,7 +1137,7 @@ func executeEntrySetFilter[K comparable, V any](ctx context.Context, bc *baseCli value *V ) - entrySetClient, err1 := bc.client.EntrySet(ctx, &request) + entrySetClient, err1 := bc.client.EntrySet(newCtx, &request) if err1 != nil { ch <- &StreamedEntry[K, V]{Err: err1} close(ch) @@ -1079,6 +1198,8 @@ func executeValues[K comparable, V any](ctx context.Context, bc *baseClient[K, V return ch } + newCtx, cancel := bc.session.ensureContext(ctx) + if fltr == nil { fltr = filters.Always() } @@ -1089,9 +1210,12 @@ func executeValues[K comparable, V any](ctx context.Context, bc *baseClient[K, V } go func() { + if cancel != nil { + defer cancel() + } request := pb.ValuesRequest{Cache: bc.name, Filter: binFilter, Format: bc.format, Scope: bc.sessionOpts.Scope} - valuesClient, err1 := bc.client.Values(ctx, &request) + valuesClient, err1 := bc.client.Values(newCtx, &request) if err1 != nil { ch <- &StreamedValue[V]{Err: err1} @@ -1133,9 +1257,8 @@ func executeValues[K comparable, V any](ctx context.Context, bc *baseClient[K, V // executeValuesNoFilter executes the Values operation against a baseClient when no filter is required. func executeValuesNoFilter[K comparable, V any](ctx context.Context, bc *baseClient[K, V]) <-chan *StreamedValue[V] { var ( - err = bc.ensureClientConnection() - ch = make(chan *StreamedValue[V]) - iter = newValuePageIterator[K, V](ctx, bc) + err = bc.ensureClientConnection() + ch = make(chan *StreamedValue[V]) ) if err != nil { @@ -1143,6 +1266,8 @@ func executeValuesNoFilter[K comparable, V any](ctx context.Context, bc *baseCli return ch } + iter := newValuePageIterator[K, V](ctx, bc) + go func() { for { result, err1 := iter.Next() diff --git a/coherence/doc.go b/coherence/doc.go index f1281ac..7ed5f16 100644 --- a/coherence/doc.go +++ b/coherence/doc.go @@ -61,6 +61,18 @@ Refer to the section on [NewSession] for more information on setting up a SSL co See [SessionOptions] which lists all the options supported by the [Session] API. +# Controlling connection timeouts + +Most operations you call require you to supply a [context.Context]. If your context does not contain a deadline, +the operation will wrap your context in a new [context.WithTimeout] using either the default timeout of 30,000 millis or +the value you set using option [coherence.WithSessionTimeout] when you called [NewSession]. + +For example, to override the default timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following: + + session, err = coherence.NewSession(ctx, coherence.WithSessionTimeout(time.Duration(5) * time.Second)) + +You can also override the default timeout using the environment variable COHERENCE_SESSION_TIMEOUT. + # Obtaining a NamedMap or NamedCache Once a session has been created, the [GetNamedMap](session, name, ...options) or [GetNamedCache](session, name, ...options) diff --git a/coherence/iterator.go b/coherence/iterator.go index a37e8c8..2cec108 100644 --- a/coherence/iterator.go +++ b/coherence/iterator.go @@ -101,9 +101,14 @@ func (it *streamedKeyIterator[K, V]) getNextPage() error { return err } + newCtx, cancel := it.bc.session.ensureContext(it.ctx) + if cancel != nil { + defer cancel() + } + request := &pb.PageRequest{Scope: it.bc.sessionOpts.Scope, Cache: it.bc.name, Format: it.bc.format, Cookie: it.cookie} - if client, err = it.bc.client.NextKeySetPage(it.ctx, request); err != nil { + if client, err = it.bc.client.NextKeySetPage(newCtx, request); err != nil { return err } @@ -221,9 +226,14 @@ func (it *streamedEntryIterator[K, V]) getNextPage() error { return err } + newCtx, cancel := it.bc.session.ensureContext(it.ctx) + if cancel != nil { + defer cancel() + } + request := &pb.PageRequest{Scope: it.bc.sessionOpts.Scope, Cache: it.bc.name, Format: it.bc.format, Cookie: it.cookie} - if client, err = it.bc.client.NextEntrySetPage(it.ctx, request); err != nil { + if client, err = it.bc.client.NextEntrySetPage(newCtx, request); err != nil { return err } diff --git a/coherence/named_cache_client.go b/coherence/named_cache_client.go index 10cb89e..0efd6be 100644 --- a/coherence/named_cache_client.go +++ b/coherence/named_cache_client.go @@ -575,7 +575,7 @@ func newNamedCache[K comparable, V any](session *Session, name string, sOpts *Se unlocked = true session.mutex.Unlock() - listener := newNamedCacheReconnectListener[K, V](session, *newCache) + listener := newNamedCacheReconnectListener[K, V](*newCache) newCache.namedCacheReconnectListener = *listener // unlock before adding reconnect listener @@ -591,7 +591,7 @@ type namedCacheReconnectListener[K comparable, V any] struct { } // newReconnectSessionListener creates a new namedCacheReconnectListener. -func newNamedCacheReconnectListener[K comparable, V any](session *Session, nc NamedCacheClient[K, V]) *namedCacheReconnectListener[K, V] { +func newNamedCacheReconnectListener[K comparable, V any](nc NamedCacheClient[K, V]) *namedCacheReconnectListener[K, V] { listener := namedCacheReconnectListener[K, V]{ listener: NewSessionLifecycleListener(), } @@ -599,7 +599,7 @@ func newNamedCacheReconnectListener[K comparable, V any](session *Session, nc Na listener.listener.OnReconnected(func(e SessionLifecycleEvent) { // re-register listeners for the NamedCache namedMap := convertNamedCacheClient[K, V](&nc) - if err := reRegisterListeners[K, V](session.sessionConnectCtx, &namedMap, &nc.baseClient); err != nil { + if err := reRegisterListeners[K, V](context.Background(), &namedMap, &nc.baseClient); err != nil { log.Println(err) } }) diff --git a/coherence/named_map_client.go b/coherence/named_map_client.go index 456398a..a54a8e5 100644 --- a/coherence/named_map_client.go +++ b/coherence/named_map_client.go @@ -547,8 +547,6 @@ func (nm *NamedMapClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.F // log.Fatal(err) // } // -// iter := namedMap.KeySet(ctx) -// // ch := namedMap.KeySet(ctx) // for result := range ch { // if result.Err != nil { @@ -792,7 +790,7 @@ func newNamedMap[K comparable, V any](session *Session, name string, sOpts *Sess unlocked = true session.mutex.Unlock() - listener := newNamedMapReconnectListener[K, V](session, *newMap) + listener := newNamedMapReconnectListener[K, V](*newMap) newMap.namedMapReconnectListener = *listener // unlock before adding reconnect listener @@ -808,7 +806,7 @@ type namedMapReconnectListener[K comparable, V any] struct { } // newReconnectSessionListener creates new namedMapReconnectListener. -func newNamedMapReconnectListener[K comparable, V any](session *Session, nm NamedMapClient[K, V]) *namedMapReconnectListener[K, V] { +func newNamedMapReconnectListener[K comparable, V any](nm NamedMapClient[K, V]) *namedMapReconnectListener[K, V] { listener := namedMapReconnectListener[K, V]{ listener: NewSessionLifecycleListener(), } @@ -816,7 +814,7 @@ func newNamedMapReconnectListener[K comparable, V any](session *Session, nm Name listener.listener.OnReconnected(func(e SessionLifecycleEvent) { // re-register listeners for the NamedMap namedMap := convertNamedMapClient[K, V](&nm) - if err := reRegisterListeners[K, V](session.sessionConnectCtx, &namedMap, &nm.baseClient); err != nil { + if err := reRegisterListeners[K, V](context.Background(), &namedMap, &nm.baseClient); err != nil { log.Println(err) } }) diff --git a/coherence/session.go b/coherence/session.go index bb5e5fa..80a2b48 100644 --- a/coherence/session.go +++ b/coherence/session.go @@ -19,20 +19,23 @@ import ( "google.golang.org/grpc/credentials/insecure" "log" "os" + "strconv" "strings" "sync" + "time" ) // ErrInvalidFormat indicates that the serialization format can only be JSON. var ErrInvalidFormat = errors.New("format can only be 'json'") const ( - defaultFormat = "json" - mapOrCacheExists = "the %s %s already exists with different type parameters" + defaultFormat = "json" + mapOrCacheExists = "the %s %s already exists with different type parameters" + defaultSessionTimeout = "30000" // millis ) -// Session provides APIs to create NamedCaches. The NewSession() method creates a -// new instance of a Session. This method also takes a variable number of arguments, called options, +// Session provides APIs to create NamedCaches. The [NewSession] method creates a +// new instance of a [Session]. This method also takes a variable number of arguments, called options, // that can be passed to configure the Session. type Session struct { sessionID uuid.UUID @@ -60,6 +63,7 @@ type SessionOptions struct { ClientKeyPath string CaCertPath string PlainText bool + Timeout time.Duration } // NewSession creates a new Session with the specified sessionOptions. @@ -94,8 +98,8 @@ type SessionOptions struct { // export COHERENCE_TLS_CERTS_PATH=/path/to/cert/to/be/added/for/trust // export COHERENCE_IGNORE_INVALID_CERTS=true // option to ignore self-signed certificates - for testing only. Not to be used in production // -// Finally, the Close() method can be used to close the Session. Once a Session is closed, no APIs -// on the NamedMap instances should be invoked. If invoked they all will return an error. +// Finally, the Close() method can be used to close the [Session]. Once a [Session] is closed, no APIs +// on the [NamedMap] instances should be invoked. If invoked they will return an error. // [gRPC Naming]: https://github.com/grpc/grpc/blob/master/doc/naming.md // [gRPC Proxy Server]: https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-remote-clients/using-coherence-grpc-server.html func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (*Session, error) { @@ -111,7 +115,8 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) ( lifecycleListeners: []*SessionLifecycleListener{}, sessOpts: &SessionOptions{ PlainText: false, - Format: defaultFormat}, + Format: defaultFormat, + Timeout: time.Duration(0) * time.Second}, } if getBoolValueFromEnvVarOrDefault(envSessionDebug, false) { @@ -135,9 +140,18 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) ( session.sessOpts.Address = getStringValueFromEnvVarOrDefault(envHostName, "localhost:1408") } + // if no timeout then use the env or default + if session.sessOpts.Timeout == time.Duration(0) { + timeoutString := getStringValueFromEnvVarOrDefault(envSessionTimeout, defaultSessionTimeout) + timeout, err := strconv.ParseInt(timeoutString, 10, 64) + if err != nil || timeout <= 0 { + return nil, fmt.Errorf("invalid value of %s for timeout", timeoutString) + } + session.sessOpts.Timeout = time.Duration(timeout) * time.Millisecond + } + // ensure initial connection - err := session.ensureConnection() - return session, err + return session, session.ensureConnection() } // WithAddress returns a function to set the address for session. @@ -169,6 +183,13 @@ func WithPlainText() func(sessionOptions *SessionOptions) { } } +// WithSessionTimeout returns a function to set the session timeout. +func WithSessionTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) { + return func(s *SessionOptions) { + s.Timeout = timeout + } +} + // ID returns the identifier of a session. func (s *Session) ID() string { return s.sessionID.String() @@ -190,6 +211,11 @@ func (s *Session) String() string { len(s.caches), len(s.maps), s.sessOpts) } +// GetSessionTimeout returns the session timeout in seconds. +func (s *Session) GetSessionTimeout() time.Duration { + return s.sessOpts.Timeout +} + // ensureConnection ensures a session has a valid connection func (s *Session) ensureConnection() error { if s.firstConnectAttempted { @@ -222,7 +248,12 @@ func (s *Session) ensureConnection() error { s.mutex.Lock() locked = true - conn, err := grpc.DialContext(s.sessionConnectCtx, s.sessOpts.Address, s.dialOptions...) + newCtx, cancel := s.ensureContext(s.sessionConnectCtx) + if cancel != nil { + defer cancel() + } + + conn, err := grpc.DialContext(newCtx, s.sessOpts.Address, s.dialOptions...) if err != nil { log.Printf("could not connect. Reason: %v", err) @@ -469,8 +500,8 @@ func validateFilePath(file string) error { // String returns a string representation of SessionOptions. func (s *SessionOptions) String() string { var sb = strings.Builder{} - sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v,", - s.Address, s.TLSEnabled, s.Scope, s.Format)) + sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, timeout=%v", + s.Address, s.TLSEnabled, s.Scope, s.Format, s.Timeout)) if s.TLSEnabled { sb.WriteString(fmt.Sprintf(" clientCertPath=%v, clientKeyPath=%v, caCertPath=%v,", @@ -490,3 +521,13 @@ func (s *Session) dispatch(eventType SessionLifecycleEventType, } } } + +// ensureContext will ensure that the context has deadline and if not will apply the timeout from the +// [SessionOptions]. +func (s *Session) ensureContext(ctx context.Context) (context.Context, context.CancelFunc) { + if _, ok := ctx.Deadline(); !ok { + // no deadline set, so wrap the context in a Timeout + return context.WithTimeout(ctx, s.sessOpts.Timeout) + } + return ctx, nil +} diff --git a/coherence/session_test.go b/coherence/session_test.go index 2cd2cf0..6c41173 100644 --- a/coherence/session_test.go +++ b/coherence/session_test.go @@ -9,7 +9,9 @@ package coherence import ( "context" "github.com/onsi/gomega" + "strconv" "testing" + "time" ) func TestSessionValidation(t *testing.T) { @@ -21,4 +23,15 @@ func TestSessionValidation(t *testing.T) { _, err = NewSession(ctx, WithFormat("not-json")) g.Expect(err).To(gomega.Equal(ErrInvalidFormat)) + + // test default timeout + timeout, _ := strconv.ParseInt(defaultSessionTimeout, 10, 64) + s, err := NewSession(ctx) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond)) + + // test setting a timeout + s, err = NewSession(ctx, WithSessionTimeout(time.Duration(33)*time.Millisecond)) + g.Expect(err).To(gomega.Not(gomega.HaveOccurred())) + g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(33) * time.Millisecond)) } diff --git a/test/e2e/standalone/named_map_test.go b/test/e2e/standalone/named_map_test.go index b0300f8..9db588c 100644 --- a/test/e2e/standalone/named_map_test.go +++ b/test/e2e/standalone/named_map_test.go @@ -99,6 +99,53 @@ func TestBasicCrudOperationsVariousTypes(t *testing.T) { map[int]string{1: "one", 2: "two", 3: "three"}) } +func TestSessionWithSpecifiedTimeout(t *testing.T) { + var ( + g = gomega.NewWithT(t) + err error + session *coherence.Session + ) + + session, err = GetSession(coherence.WithSessionTimeout(time.Duration(10) * time.Second)) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer session.Close() + + runTimeoutTest(g, session) +} + +func TestSessionWithEnvTimeout(t *testing.T) { + var ( + g = gomega.NewWithT(t) + err error + session *coherence.Session + ) + + t.Setenv("COHERENCE_SESSION_TIMEOUT", "10000") + + session, err = GetSession() + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer session.Close() + + runTimeoutTest(g, session) +} + +func runTimeoutTest(g *gomega.WithT, session *coherence.Session) { + // we should get an error as we should be > default timeout + namedMap := getNewNamedMap[int, string](g, session, "timeout-map") + err := namedMap.Clear(ctx) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + + namedCache := getNewNamedCache[int, string](g, session, "timeout-cache") + err = namedCache.Clear(ctx) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + + // create a new context with an existing deadline, it should be honored + ctxNew, cancel := context.WithTimeout(ctx, time.Duration(1)*time.Nanosecond) + defer cancel() + err = namedCache.Clear(ctxNew) + g.Expect(err).Should(gomega.HaveOccurred()) +} + // TestBasicCrudOperationsVariousTypesWithStructKey tests operations against caches that have keys and values as structs. func TestBasicCrudOperationsVariousTypesWithStructKey(t *testing.T) { var (