From 96c7bdcee163282ac9fb533672789089ab8aa22c Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 22 Jan 2018 11:10:59 -0700 Subject: [PATCH] Fix tests related to MsgStore.Empty() --- stores/common.go | 3 --- stores/common_msg_test.go | 3 --- stores/common_test.go | 1 + stores/memstore.go | 9 +++++++-- stores/memstore_test.go | 35 +++++++++++++++++++++++++++++++++++ 5 files changed, 43 insertions(+), 8 deletions(-) diff --git a/stores/common.go b/stores/common.go index 95182d71..a4e1e046 100644 --- a/stores/common.go +++ b/stores/common.go @@ -277,9 +277,6 @@ func (gms *genericMsgStore) GetSequenceFromTimestamp(timestamp int64) (uint64, e // Empty implements the MsgStore interface func (gms *genericMsgStore) Empty() error { - gms.Lock() - gms.empty() - gms.Unlock() return nil } diff --git a/stores/common_msg_test.go b/stores/common_msg_test.go index db17ac5c..9b608983 100644 --- a/stores/common_msg_test.go +++ b/stores/common_msg_test.go @@ -658,9 +658,6 @@ func TestCSMsgStoreEmpty(t *testing.T) { for _, st := range testStores { st := st t.Run(st.name, func(t *testing.T) { - if !st.recoverable { - return - } t.Parallel() defer endTest(t, st) diff --git a/stores/common_test.go b/stores/common_test.go index 33b7f25e..0d04ac00 100644 --- a/stores/common_test.go +++ b/stores/common_test.go @@ -384,6 +384,7 @@ func TestGSNoOps(t *testing.T) { msgStoreLastMsg(t, gms) != nil || gms.Flush() != nil || msgStoreGetSequenceFromTimestamp(t, gms, 0) != 0 || + gms.Empty() != nil || gms.Close() != nil { t.Fatal("Expected no value since these should not be implemented for generic store") } diff --git a/stores/memstore.go b/stores/memstore.go index daea52b0..1be6e516 100644 --- a/stores/memstore.go +++ b/stores/memstore.go @@ -174,12 +174,11 @@ func (ms *MemoryMsgStore) GetSequenceFromTimestamp(timestamp int64) (uint64, err // limit's MaxAge. func (ms *MemoryMsgStore) expireMsgs() { ms.Lock() + defer ms.Unlock() if ms.closed { - ms.Unlock() ms.wg.Done() return } - defer ms.Unlock() now := time.Now().UnixNano() maxAge := int64(ms.limits.MaxAge) @@ -216,6 +215,12 @@ func (ms *MemoryMsgStore) removeFirstMsg() { // Empty implements the MsgStore interface func (ms *MemoryMsgStore) Empty() error { ms.Lock() + if ms.ageTimer != nil { + if ms.ageTimer.Stop() { + ms.wg.Done() + } + ms.ageTimer = nil + } ms.empty() ms.msgs = make(map[uint64]*pb.MsgProto) ms.Unlock() diff --git a/stores/memstore_test.go b/stores/memstore_test.go index e89667c2..5aed0c70 100644 --- a/stores/memstore_test.go +++ b/stores/memstore_test.go @@ -5,6 +5,7 @@ package stores import ( "reflect" "testing" + "time" ) func createDefaultMemStore(t tLogger) *MemoryStore { @@ -50,3 +51,37 @@ func TestMSNegativeLimitsOnCreate(t *testing.T) { t.Fatal("Should have failed to create store with a negative limit") } } + +func TestMSMsgStoreEmpty(t *testing.T) { + s := createDefaultMemStore(t) + defer s.Close() + + limits := StoreLimits{} + limits.MaxAge = 250 * time.Millisecond + if err := s.SetLimits(&limits); err != nil { + t.Fatalf("Error setting limits: %v", err) + } + + cs := storeCreateChannel(t, s, "foo") + + // Send some messages + for i := 0; i < 3; i++ { + storeMsg(t, cs, "foo", uint64(i+1), []byte("hello")) + } + // Then empty the message store + if err := cs.Msgs.Empty(); err != nil { + t.Fatalf("Error on Empty(): %v", err) + } + + ms := cs.Msgs.(*MemoryMsgStore) + ms.RLock() + if ms.ageTimer != nil { + ms.RUnlock() + t.Fatal("AgeTimer not nil") + } + if ms.first != 0 || ms.last != 0 { + ms.RUnlock() + t.Fatalf("First and/or Last not reset") + } + ms.RUnlock() +}