Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions coherence/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"log"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -697,6 +698,7 @@ type mapEventManager[K comparable, V any] struct {
lifecycleListeners []*MapLifecycleListener[K, V]
pendingRegistrations map[string]*pendingListenerOp[K, V]
eventStream *eventStream
mutex sync.RWMutex
}

// pendingListenerOp is a simple holder for the listener
Expand Down Expand Up @@ -731,6 +733,9 @@ func newMapEventManager[K comparable, V any](namedMap *NamedMap[K, V], bc baseCl

// close closes the event stream.
func (m *mapEventManager[K, V]) close() {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.eventStream != nil {
m.eventStream.cancel()
}
Expand All @@ -746,6 +751,9 @@ func (m *mapEventManager[K, V]) close() {

// addLifecycleListener adds the specified [MapLifecycleListener].
func (m *mapEventManager[K, V]) addLifecycleListener(listener MapLifecycleListener[K, V]) {
m.mutex.Lock()
defer m.mutex.Unlock()

for _, e := range m.lifecycleListeners {
if *e == listener {
return
Expand All @@ -756,6 +764,9 @@ func (m *mapEventManager[K, V]) addLifecycleListener(listener MapLifecycleListen

// removeLifecycleListener removes the specified [MapLifecycleListener].
func (m *mapEventManager[K, V]) removeLifecycleListener(listener MapLifecycleListener[K, V]) {
m.mutex.Lock()
defer m.mutex.Unlock()

idx := -1
listeners := m.lifecycleListeners
for i, c := range listeners {
Expand All @@ -774,6 +785,9 @@ func (m *mapEventManager[K, V]) removeLifecycleListener(listener MapLifecycleLis
// to the Coherence cluster that an event may omit the old and new
// values when emitting a MapEvent.
func (m *mapEventManager[K, V]) addKeyListener(ctx context.Context, listener MapListener[K, V], key K, lite bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()

group, lPresent := m.keyListeners[key]
if !lPresent {
groupInner, err := makeKeyListenerGroup(m, key)
Expand All @@ -789,6 +803,9 @@ func (m *mapEventManager[K, V]) addKeyListener(ctx context.Context, listener Map

// removeKeyListener removes the specified key-based listener.
func (m *mapEventManager[K, V]) removeKeyListener(ctx context.Context, listener MapListener[K, V], key K) error {
m.mutex.Lock()
defer m.mutex.Unlock()

group, lPresent := m.keyListeners[key]
if lPresent {
return group.removeListener(ctx, listener)
Expand All @@ -800,6 +817,9 @@ func (m *mapEventManager[K, V]) removeKeyListener(ctx context.Context, listener
// to the Coherence cluster that an event may omit the old and new
// values when emitting a MapEvent.
func (m *mapEventManager[K, V]) addFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter, lite bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()

filterLocal := filter
if filterLocal == nil {
filterLocal = defaultFilter
Expand All @@ -820,6 +840,9 @@ func (m *mapEventManager[K, V]) addFilterListener(ctx context.Context, listener

// removeFilterListener removes the specified filter-based listener.
func (m *mapEventManager[K, V]) removeFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error {
m.mutex.Lock()
defer m.mutex.Unlock()

filterLocal := filter
if filterLocal == nil {
filterLocal = defaultFilter
Expand All @@ -836,6 +859,9 @@ func (m *mapEventManager[K, V]) removeFilterListener(ctx context.Context, listen
// managing MapEvents raised by Coherence.
func (m *mapEventManager[K, V]) ensureStream() (*eventStream, error) {
if m.eventStream == nil {
m.mutex.Lock()
defer m.mutex.Unlock()

// because the event stream is for the lifetime of the cache,
// we use context.Background() and ignore any user provided
// timeouts
Expand Down Expand Up @@ -959,8 +985,7 @@ func (m *mapEventManager[K, V]) newSubscribeRequest(requestType string) proto.Ma
}
}

func (m *mapEventManager[K, V]) dispatch(eventType MapLifecycleEventType,
creator func() MapLifecycleEvent[K, V]) {
func (m *mapEventManager[K, V]) dispatch(eventType MapLifecycleEventType, creator func() MapLifecycleEvent[K, V]) {
if len(m.lifecycleListeners) > 0 {
event := creator()
for _, l := range m.lifecycleListeners {
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/standalone/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestMapAndLifecycleEventsAll(t *testing.T) {
// TestEventDisconnect tests to ensure that if we get a disconnect, then we can
func TestEventDisconnect(t *testing.T) {
t.Setenv("COHERENCE_SESSION_DEBUG", "true")
t.Skip("Skipping test temporarily while sorting out reconnect issue")
//g, session := initTest(t)
g, session := initTest(t,
coherence.WithDisconnectTimeout(time.Duration(130)*time.Second),
Expand All @@ -125,6 +126,7 @@ func TestEventDisconnect(t *testing.T) {
// as we have stopped the gRPC proxy before the test runs.
func TestEventDisconnectWithReadyTimeoutDelay(t *testing.T) {
t.Setenv("COHERENCE_SESSION_DEBUG", "true")
t.Skip("Skipping test temporarily while sorting out reconnect issue")

fmt.Println("Issue stop of $GRPC:GrpcProxy")
_, err := IssuePostRequest("http://127.0.0.1:30000/management/coherence/cluster/services/$GRPC:GrpcProxy/members/1/stop")
Expand Down