Skip to content

Commit

Permalink
Merge 275e97c into 93af2cf
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 22, 2018
2 parents 93af2cf + 275e97c commit 992f80d
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 16 deletions.
4 changes: 2 additions & 2 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestClientAddSub(t *testing.T) {
}

// Now register the client
sc, _ := cs.register(info)
cs.register(info)

// Now this should work
if !cs.addSub(clientID, sub) {
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestClientAddSub(t *testing.T) {
cs.register(info)
runtime.Gosched()
c, _ := cs.unregister(clientID)
if sc == nil {
if c == nil {
t.Fatal("Client should have been found")
}
c.RLock()
Expand Down
48 changes: 36 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2818,15 +2818,8 @@ func (s *StanServer) checkClientHealth(clientID string) {
func (s *StanServer) closeClient(clientID string) error {
s.closeMu.Lock()
defer s.closeMu.Unlock()
// Remove from our clientStore.
client, err := s.clients.unregister(clientID)
// The above call may return an error (due to storage) but still return
// the client that is being unregistered. So log error an proceed.
if err != nil {
s.log.Errorf("Error unregistering client %q: %v", clientID, err)
}
// This would mean that the client was already unregistered or was never
// registered.
// Lookup client first, will unregister only after removing its subscriptions
client := s.clients.lookup(clientID)
if client == nil {
s.log.Errorf("Unknown client %q in close request", clientID)
return ErrUnknownClient
Expand All @@ -2835,6 +2828,11 @@ func (s *StanServer) closeClient(clientID string) error {
// Remove all non-durable subscribers.
s.removeAllNonDurableSubscribers(client)

// Remove from our clientStore.
if _, err := s.clients.unregister(clientID); err != nil {
s.log.Errorf("Error unregistering client %q: %v", clientID, err)
}

if s.debug {
client.RLock()
hbInbox := client.info.HbInbox
Expand Down Expand Up @@ -3892,9 +3890,12 @@ func (s *StanServer) removeAllNonDurableSubscribers(client *client) {
client.RLock()
subs := client.subs
client.RUnlock()
var storesToFlush map[string]stores.SubStore
for _, sub := range subs {
sub.RLock()
subject := sub.subject
isDurable := sub.IsDurable
subStore := sub.store
sub.RUnlock()
// Get the channel
c := s.channels.get(subject)
Expand All @@ -3903,6 +3904,22 @@ func (s *StanServer) removeAllNonDurableSubscribers(client *client) {
}
// Don't remove durables
c.ss.Remove(c, sub, false)
// If the sub is a durable, there may have been an update to storage,
// so we will want to flush the store. In clustering, during replay,
// subStore may be nil.
if isDurable && subStore != nil {
if storesToFlush == nil {
storesToFlush = make(map[string]stores.SubStore, 16)
}
storesToFlush[subject] = subStore
}
}
if len(storesToFlush) > 0 {
for subject, subStore := range storesToFlush {
if err := subStore.Flush(); err != nil {
s.log.Errorf("[Client:%s] Error flushing store while removing subscriptions: subject=%s, err=%v", subject, err)
}
}
}
}

Expand Down Expand Up @@ -4005,10 +4022,10 @@ func (s *StanServer) unsubscribe(req *pb.UnsubscribeRequest, isSubClose bool) er
req.ClientID, action, req.Inbox)
return ErrInvalidSub
}
return s.unsubscribeSub(c, req.ClientID, action, sub, isSubClose)
return s.unsubscribeSub(c, req.ClientID, action, sub, isSubClose, true)
}

func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *subState, isSubClose bool) error {
func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *subState, isSubClose, shouldFlush bool) error {
// Remove from Client
if !s.clients.removeSub(clientID, sub) {
s.log.Errorf("[Client:%s] %s request for missing client", clientID, action)
Expand All @@ -4017,10 +4034,17 @@ func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *su
// Remove the subscription
unsubscribe := !isSubClose
c.ss.Remove(c, sub, unsubscribe)
var err error
if shouldFlush {
sub.RLock()
ss := sub.store
sub.RUnlock()
err = ss.Flush()
}
s.monMu.Lock()
s.numSubs--
s.monMu.Unlock()
return nil
return err
}

func (s *StanServer) replicateRemoveSubscription(req *pb.UnsubscribeRequest) error {
Expand Down
63 changes: 63 additions & 0 deletions server/server_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"flag"
"fmt"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -771,6 +772,14 @@ func (l *captureFatalLogger) Fatalf(format string, args ...interface{}) {
l.Unlock()
}

type storeNoClose struct {
stores.Store
}

func (snc *storeNoClose) Close() error {
return nil
}

func TestGhostDurableSubs(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
Expand Down Expand Up @@ -826,9 +835,63 @@ func TestGhostDurableSubs(t *testing.T) {
s.Shutdown()
check(true)

// We cannot do the rest of this test on Windows because
// to simulate crash we don't close store and restart the
// server. This would not be allowed on Windows.
if runtime.GOOS == "windows" {
return
}

// Re-open, this time, there should not be the warning anymore.
opts.ClientHBInterval = 250 * time.Millisecond
opts.ClientHBTimeout = 100 * time.Millisecond
opts.ClientHBFailCount = 1
s = runServerWithOpts(t, opts, nil)
check(false)

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, clientName,
stan.NatsConn(nc), stan.ConnectWait(500*time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()

ch := make(chan bool, 1)
// Create queue sub
if _, err := sc.QueueSubscribe("foo", "bar",
func(_ *stan.Msg) {
ch <- true
},
stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
if err := Wait(ch); err != nil {
t.Fatal("Did not get our message")
}

// Close NATS connection to cause server to close client connection due
// to missed HBs.
nc.Close()

waitForNumClients(t, s, 0)

// Change store to simulate no flush on simulated crash
s.store = &storeNoClose{Store: s.store}
s.Shutdown()

// Re-open, there should be no warning.
s = runServerWithOpts(t, opts, nil)
check(false)

sc.Close()
}

func TestGetNATSOptions(t *testing.T) {
Expand Down
55 changes: 54 additions & 1 deletion server/server_storefailures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type mockedMsgStore struct {
type mockedSubStore struct {
stores.SubStore
sync.RWMutex
fail bool
fail bool
failFlushOnce bool
}

func (ms *mockedStore) CreateChannel(name string) (*stores.Channel, error) {
Expand Down Expand Up @@ -279,6 +280,16 @@ func (ss *mockedSubStore) DeleteSub(subid uint64) error {
return ss.SubStore.DeleteSub(subid)
}

func (ss *mockedSubStore) Flush() error {
ss.RLock()
fail := ss.failFlushOnce
ss.RUnlock()
if fail {
return fmt.Errorf("On purpose")
}
return ss.SubStore.Flush()
}

func TestDeleteSubFailures(t *testing.T) {
logger := &checkErrorLogger{checkErrorStr: "deleting subscription"}
opts := GetDefaultOptions()
Expand Down Expand Up @@ -428,6 +439,48 @@ func TestUpdateSubFailure(t *testing.T) {
}
}

func TestCloseClientWithDurableSubs(t *testing.T) {
logger := &checkErrorLogger{checkErrorStr: "flushing store"}
opts := GetDefaultOptions()
opts.CustomLogger = logger
s, err := RunServerWithOpts(opts, nil)
if err != nil {
t.Fatalf("Error running server: %v", err)
}
defer s.Shutdown()

s.channels.Lock()
s.channels.store = &mockedStore{Store: s.channels.store}
s.channels.Unlock()

sc := NewDefaultConnection(t)
defer sc.Close()

if _, err := sc.QueueSubscribe("foo", "bar",
func(_ *stan.Msg) {},
stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
waitForNumSubs(t, s, clientName, 1)

cs := channelsGet(t, s.channels, "foo")
mss := cs.store.Subs.(*mockedSubStore)
mss.Lock()
mss.failFlushOnce = true
mss.Unlock()

// Close client, we should get failure trying to update the queue sub record
sc.Close()
waitForNumClients(t, s, 0)

logger.Lock()
gotIt := logger.gotError
logger.Unlock()
if !gotIt {
t.Fatalf("Server did not log error on close client")
}
}

func TestSendMsgToSubStoreFailure(t *testing.T) {
logger := &checkErrorLogger{checkErrorStr: "add pending message"}
opts := GetDefaultOptions()
Expand Down
3 changes: 2 additions & 1 deletion server/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,11 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) {
sub.RLock()
clientID := sub.ClientID
sub.RUnlock()
if err := s.unsubscribeSub(c, clientID, "unsub", sub, false); err != nil {
if err := s.unsubscribeSub(c, clientID, "unsub", sub, false, false); err != nil {
return err
}
}
c.store.Subs.Flush()
}
for clientID := range s.clients.getClients() {
if _, err := s.clients.unregister(clientID); err != nil {
Expand Down

0 comments on commit 992f80d

Please sign in to comment.