Skip to content

Commit

Permalink
[FIXED] Clustering: possible deadlock on channel deletion
Browse files Browse the repository at this point in the history
There was a race between the decision of deleting a channel and
the processing of a subscription on that channel that could cause
the server to deadlock in clustering mode.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 21, 2019
1 parent dec9ea4 commit 318a40f
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -23,6 +23,6 @@ before_script:
script:
- mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;"
- go test -i $EXCLUDE_VENDOR
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else go test $EXCLUDE_VENDOR; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast $EXCLUDE_VENDOR; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]] && [ "$TRAVIS_TAG" != "" ]; then ./scripts/cross_compile.sh $TRAVIS_TAG; ghr --owner nats-io --token $GITHUB_TOKEN --draft --replace $TRAVIS_TAG pkg/; fi
24 changes: 12 additions & 12 deletions scripts/cov.sh
Expand Up @@ -3,18 +3,18 @@

rm -rf ./cov
mkdir cov
go test -v -covermode=atomic -coverprofile=./cov/server.out -coverpkg=./server,./stores,./util ./server
go test -v -covermode=atomic -coverprofile=./cov/server2.out -coverpkg=./server,./stores,./util -run=TestPersistent ./server -encrypt
go test -v -covermode=atomic -coverprofile=./cov/logger.out ./logger
go test -v -covermode=atomic -coverprofile=./cov/stores1.out ./stores
go test -v -covermode=atomic -coverprofile=./cov/stores2.out -run=TestCS/FILE ./stores -fs_no_buffer
go test -v -covermode=atomic -coverprofile=./cov/stores3.out -run=TestCS/FILE ./stores -fs_set_fds_limit
go test -v -covermode=atomic -coverprofile=./cov/stores4.out -run=TestCS/FILE ./stores -fs_no_buffer -fs_set_fds_limit
go test -v -covermode=atomic -coverprofile=./cov/stores5.out -run=TestFS ./stores -fs_no_buffer
go test -v -covermode=atomic -coverprofile=./cov/stores6.out -run=TestFS ./stores -fs_set_fds_limit
go test -v -covermode=atomic -coverprofile=./cov/stores7.out -run=TestFS ./stores -fs_no_buffer -fs_set_fds_limit
go test -v -covermode=atomic -coverprofile=./cov/stores8.out -run=TestCS ./stores -encrypt
go test -v -covermode=atomic -coverprofile=./cov/util.out ./util
go test -v -failfast -covermode=atomic -coverprofile=./cov/server.out -coverpkg=./server,./stores,./util ./server
go test -v -failfast -covermode=atomic -coverprofile=./cov/server2.out -coverpkg=./server,./stores,./util -run=TestPersistent ./server -encrypt
go test -v -failfast -covermode=atomic -coverprofile=./cov/logger.out ./logger
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores1.out ./stores
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores2.out -run=TestCS/FILE ./stores -fs_no_buffer
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores3.out -run=TestCS/FILE ./stores -fs_set_fds_limit
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores4.out -run=TestCS/FILE ./stores -fs_no_buffer -fs_set_fds_limit
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores5.out -run=TestFS ./stores -fs_no_buffer
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores6.out -run=TestFS ./stores -fs_set_fds_limit
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores7.out -run=TestFS ./stores -fs_no_buffer -fs_set_fds_limit
go test -v -failfast -covermode=atomic -coverprofile=./cov/stores8.out -run=TestCS ./stores -encrypt
go test -v -failfast -covermode=atomic -coverprofile=./cov/util.out ./util
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

Expand Down
79 changes: 79 additions & 0 deletions server/clustering_test.go
Expand Up @@ -4092,3 +4092,82 @@ func TestClusteringDeleteChannelLeaksSnapshotSubs(t *testing.T) {
t.Fatalf("Snapshot subscription for channel still exists")
}
}

func TestClusteringDeadlockOnChannelDelete(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

maxInactivity := 1000 * time.Millisecond

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.MaxInactivity = maxInactivity
s1 := runServerWithOpts(t, s1sOpts, nil)
// No defer in case deadlock is detected, it would
// prevent the print of t.Fatalf()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.MaxInactivity = maxInactivity
s2 := runServerWithOpts(t, s2sOpts, nil)
// No defer in case deadlock is detected, it would
// prevent the print of t.Fatalf()

servers := []*StanServer{s1, s2}
// Wait for leader to be elected.
leader := getLeader(t, 10*time.Second, servers...)

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unable to connect")
}
defer nc.Close()

leader.mu.RLock()
newSubSubject := leader.info.Subscribe
leader.mu.RUnlock()

req := pb.SubscriptionRequest{
ClientID: "me",
AckWaitInSecs: 30,
Inbox: nats.NewInbox(),
MaxInFlight: 1,
}

for i := 0; i < 1000; i++ {
leader.lookupOrCreateChannel(fmt.Sprintf("foo.%d", i))
}

time.Sleep(990 * time.Millisecond)

for i := 0; i < 1000; i++ {
req.Subject = fmt.Sprintf("foo.%d", i)
b, _ := req.Marshal()
nc.Publish(newSubSubject, b)
}

ch := make(chan struct{}, 1)
go func() {
for {
if leader.channels.count() != 0 {
time.Sleep(15 * time.Millisecond)
continue
}
ch <- struct{}{}
return
}
}()
select {
case <-ch:
s2.Shutdown()
s1.Shutdown()
case <-time.After(5 * time.Second):
t.Fatalf("Deadlock likely!!!")
}
}
142 changes: 90 additions & 52 deletions server/server.go
Expand Up @@ -259,7 +259,6 @@ func (state State) String() string {

type channelStore struct {
sync.RWMutex
delMu sync.Mutex
channels map[string]*channel
store stores.Store
stan *StanServer
Expand All @@ -283,13 +282,21 @@ func (cs *channelStore) get(name string) *channel {

func (cs *channelStore) createChannel(s *StanServer, name string) (*channel, error) {
cs.Lock()
defer cs.Unlock()
c, err := cs.createChannelLocked(s, name)
cs.Unlock()
return c, err
}

func (cs *channelStore) createChannelLocked(s *StanServer, name string) (*channel, error) {
// It is possible that there were 2 concurrent calls to lookupOrCreateChannel
// which first uses `channelStore.get()` and if not found, calls this function.
// So we need to check now that we have the write lock that the channel has
// not already been created.
c := cs.channels[name]
if c != nil {
if c.activity != nil && c.activity.deleteInProgress {
return nil, ErrChanDelInProgress
}
return c, nil
}
sc, err := cs.store.CreateChannel(name)
Expand Down Expand Up @@ -377,25 +384,29 @@ func (cs *channelStore) count() int {
return count
}

func (cs *channelStore) lockDelete() {
cs.delMu.Lock()
}

func (cs *channelStore) unlockDelete() {
cs.delMu.Unlock()
}

func (cs *channelStore) maybeStartChannelDeleteTimer(name string, c *channel) {
cs.delMu.Lock()
cs.RLock()
cs.Lock()
if c == nil {
c = cs.channels[name]
}
if c != nil && c.activity != nil && !c.activity.deleteInProgress && !c.ss.hasActiveSubs() {
c.startDeleteTimer()
}
cs.RUnlock()
cs.delMu.Unlock()
cs.Unlock()
}

func (cs *channelStore) stopDeleteTimer(c *channel) {
cs.Lock()
c.stopDeleteTimer()
cs.Unlock()
}

func (cs *channelStore) turnOffPreventDelete(c *channel) {
cs.Lock()
if c != nil && c.activity != nil {
c.activity.preventDelete = false
}
cs.Unlock()
}

type channel struct {
Expand All @@ -414,19 +425,20 @@ type channelActivity struct {
maxInactivity time.Duration
timer *time.Timer
deleteInProgress bool
preventDelete bool
timerSet bool
}

// Starts the delete timer that when firing will post
// a channel delete request to the ioLoop.
// The channelStore's delMu mutex must be held on entry.
// The channelStore's mutex must be held on entry.
func (c *channel) startDeleteTimer() {
c.activity.last = time.Now()
c.resetDeleteTimer(c.activity.maxInactivity)
}

// Stops the delete timer.
// The channelStore's delMu mutex must be held on entry.
// The channelStore's mutex must be held on entry.
func (c *channel) stopDeleteTimer() {
if c.activity.timer != nil {
c.activity.timer.Stop()
Expand Down Expand Up @@ -709,14 +721,44 @@ func (sa *subSentAndAck) reset() {

// Looks up, or create a new channel if it does not exist
func (s *StanServer) lookupOrCreateChannel(name string) (*channel, error) {
c := s.channels.get(name)
cs := s.channels
cs.RLock()
c := cs.channels[name]
if c != nil {
if c.activity != nil && c.activity.deleteInProgress {
cs.RUnlock()
return nil, ErrChanDelInProgress
}
cs.RUnlock()
return c, nil
}
return s.channels.createChannel(s, name)
cs.RUnlock()
return cs.createChannel(s, name)
}

func (s *StanServer) lookupOrCreateChannelPreventDelete(name string) (*channel, bool, error) {
cs := s.channels
cs.Lock()
c := cs.channels[name]
if c != nil {
if c.activity != nil && c.activity.deleteInProgress {
cs.Unlock()
return nil, false, ErrChanDelInProgress
}
} else {
var err error
c, err = cs.createChannelLocked(s, name)
if err != nil {
cs.Unlock()
return nil, false, err
}
}
if c.activity != nil {
c.activity.preventDelete = true
c.stopDeleteTimer()
}
cs.Unlock()
return c, true, nil
}

// createSubStore creates a new instance of `subStore`.
Expand Down Expand Up @@ -1974,9 +2016,7 @@ func (s *StanServer) leadershipLost() {
c.snapshotSub = nil
}
if c.activity != nil {
s.channels.lockDelete()
c.stopDeleteTimer()
s.channels.unlockDelete()
s.channels.stopDeleteTimer(c)
}
}

Expand Down Expand Up @@ -2596,13 +2636,12 @@ func (s *StanServer) replicateDeleteChannel(channel string) {
func (s *StanServer) handleChannelDelete(c *channel) {
delete := false
cs := s.channels
cs.lockDelete()
cs.Lock()
a := c.activity
if a.deleteInProgress || c.ss.hasActiveSubs() {
if a.preventDelete || a.deleteInProgress || c.ss.hasActiveSubs() {
if s.debug {
s.log.Debugf("Channel %q cannot be deleted: inProgress=%v hasActiveSubs=%v",
c.name, a.deleteInProgress, c.ss.hasActiveSubs())
s.log.Debugf("Channel %q cannot be deleted: preventDelete=%v inProgress=%v hasActiveSubs=%v",
c.name, a.preventDelete, a.deleteInProgress, c.ss.hasActiveSubs())
}
c.stopDeleteTimer()
} else {
Expand Down Expand Up @@ -2633,7 +2672,6 @@ func (s *StanServer) handleChannelDelete(c *channel) {
}
}
cs.Unlock()
cs.unlockDelete()
if delete {
if testDeleteChannel {
time.Sleep(time.Second)
Expand All @@ -2649,31 +2687,33 @@ func (s *StanServer) handleChannelDelete(c *channel) {
// Actual deletetion of the channel.
func (s *StanServer) processDeleteChannel(channel string) {
cs := s.channels
cs.lockDelete()
defer cs.unlockDelete()
cs.Lock()
defer cs.Unlock()
c := cs.channels[channel]
if c == nil {
s.log.Errorf("Error deleting channel %q: not found", channel)
return
}
if c.activity != nil && c.activity.preventDelete {
s.log.Errorf("The channel %q cannot be deleted at this time since a subscription has been created", channel)
return
}
// Delete from store
if err := cs.store.DeleteChannel(channel); err != nil {
s.log.Errorf("Error deleting channel %q: %v", channel, err)
c := cs.channels[channel]
if c != nil && c.activity != nil {
if c.activity != nil {
c.activity.deleteInProgress = false
c.startDeleteTimer()
}
return
}
// If no error, remove channel
c := s.channels.channels[channel]
if c != nil {
// If there was a subscription for snapshots requests,
// we need to unsubscribe.
if c.snapshotSub != nil {
c.snapshotSub.Unsubscribe()
c.snapshotSub = nil
}
delete(s.channels.channels, channel)
// If there was a subscription for snapshots requests,
// we need to unsubscribe.
if c.snapshotSub != nil {
c.snapshotSub.Unsubscribe()
c.snapshotSub = nil
}
delete(s.channels.channels, channel)
s.log.Noticef("Channel %q has been deleted", channel)
}

Expand Down Expand Up @@ -4531,13 +4571,16 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
ackInbox = nats.NewInbox()
)

c, err := s.lookupOrCreateChannel(sr.Subject)
// Lookup/create the channel and prevent this channel to be deleted
// until we are done with this subscription. This will also stop
// the delete timer if one was set.
c, preventDelete, err := s.lookupOrCreateChannelPreventDelete(sr.Subject)
// Immediately register a defer action to turn off preventing the
// deletion of the channel if it was turned on
if preventDelete {
defer s.channels.turnOffPreventDelete(c)
}
if err == nil {
// Keep the channel delete mutex to ensure that channel cannot be
// deleted while we are about to add a subscription.
s.channels.lockDelete()
defer s.channels.unlockDelete()

// If clustered, thread operations through Raft.
if s.isClustered {
// For start requests other than SequenceStart, we MUST convert the request
Expand Down Expand Up @@ -4569,15 +4612,10 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
}
}
if err != nil {
s.channels.maybeStartChannelDeleteTimer(sr.Subject, c)
s.sendSubscriptionResponseErr(m.Reply, err)
return
}
// If the channel has a MaxInactivity limit, stop the timer since we know that there
// is at least one active subscription.
if c.activity != nil {
// We are under the channelStore delete mutex
c.stopDeleteTimer()
}

// In case this is a durable, sub already exists so we need to protect access
sub.Lock()
Expand Down

0 comments on commit 318a40f

Please sign in to comment.