Skip to content

Commit

Permalink
[FIXED] Cluster: Handling of deleted channels
Browse files Browse the repository at this point in the history
A channel could be deleted and then later recreated (same name).
This is not handled properly in some cases such as restore from
snapshot.
This PR attempts to fix the problem by storing in the RAFT db
a UID when a channel is created. This UID is added to some of
the replicated operations.

Resolves #1008
Resolves #1010

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jan 23, 2020
1 parent d7aed78 commit bff9895
Show file tree
Hide file tree
Showing 8 changed files with 1,532 additions and 773 deletions.
176 changes: 140 additions & 36 deletions server/clustering.go
@@ -1,4 +1,4 @@
// Copyright 2017-2019 The NATS Authors
// Copyright 2017-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -100,8 +100,8 @@ type replicatedSub struct {

type raftFSM struct {
sync.Mutex
snapshotsOnInit int
server *StanServer
restoreFromInit bool
}

// shutdown attempts to stop the Raft node.
Expand Down Expand Up @@ -297,6 +297,20 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
if err != nil {
return false, err
}

// Get through the list of channels that we have recovered from streaming store
// and set their corresponding UID.
s.channels.Lock()
for cname, c := range s.channels.channels {
uid, err := store.GetChannelUID(cname)
if err != nil {
s.channels.Unlock()
return false, err
}
c.uid = uid
}
s.channels.Unlock()

cacheStore, err := raft.NewLogCache(s.opts.Clustering.LogCacheSize, store)
if err != nil {
store.Close()
Expand All @@ -309,7 +323,7 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
if runningInTests {
config.ElectionTimeout = 100 * time.Millisecond
config.HeartbeatTimeout = 100 * time.Millisecond
config.LeaderLeaseTimeout = 50 * time.Millisecond
config.LeaderLeaseTimeout = 100 * time.Millisecond
} else {
if s.opts.Clustering.RaftHeartbeatTimeout == 0 {
s.opts.Clustering.RaftHeartbeatTimeout = defaultRaftHBTimeout
Expand Down Expand Up @@ -359,9 +373,15 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
config.NotifyCh = raftNotifyCh

fsm := &raftFSM{server: s}
fsm.Lock()
fsm.snapshotsOnInit = len(sl)
fsm.Unlock()
// We want to know in snapshot.go:Restore() if we are called from NewRaft() or
// at runtime when catching up with the leader. To do so we will set this boolean
// if there were more than one snapshot before the call. The boolean will be cleared
// in Restore() itself.
if len(sl) > 0 {
fsm.Lock()
fsm.restoreFromInit = true
fsm.Unlock()
}
s.raft.fsm = fsm
node, err := raft.NewRaft(config, fsm, cacheStore, store, snapshotStore, transport)
if err != nil {
Expand Down Expand Up @@ -513,40 +533,42 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
switch op.OpType {
case spb.RaftOperation_Publish:
// Message replication.
var (
c *channel
err error
)
for _, msg := range op.PublishBatch.Messages {
// This is a batch for a given channel, so lookup channel once.
if c == nil {
c, err = s.lookupOrCreateChannel(msg.Subject)
// That should not be the case, but if it happens,
// just bail out.
if err == ErrChanDelInProgress {
return nil
} else if err == nil && !c.lSeqChecked {
// If msg.Sequence is > 1, then make sure we have no gap.
if msg.Sequence > 1 {
// We pass `1` for the `first` sequence. The function we call
// will do the right thing when it comes to restore possible
// missing messages.
err = s.raft.fsm.restoreMsgsFromSnapshot(c, 1, msg.Sequence-1, true)
}
if err == nil {
c.lSeqChecked = true
}
if len(op.PublishBatch.Messages) == 0 {
return nil
}
// This is a batch for a given channel, so lookup channel once.
msg := op.PublishBatch.Messages[0]
c, err := r.lookupOrCreateChannel(msg.Subject, op.ChannelUID, onlyIfNotExist)
if err != nil {
goto FATAL_ERROR
}
// `c` will be nil if the existing channel does not have the same UID.
// This could be the case in RAFT log replay if we have several "versions"
// of the same channel. If that is the case, simply skip here.
if c == nil {
return nil
}
if !c.lSeqChecked {
// If msg.Sequence is > 1, then make sure we have no gap.
if msg.Sequence > 1 {
// We pass `1` for the `first` sequence. The function we call
// will do the right thing when it comes to restore possible
// missing messages.
if err = s.raft.fsm.restoreMsgsFromSnapshot(c, 1, msg.Sequence-1, true); err != nil {
goto FATAL_ERROR
}
}
if err == nil {
_, err = c.store.Msgs.Store(msg)
}
if err != nil {
panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err))
c.lSeqChecked = true
}
for _, msg = range op.PublishBatch.Messages {
if _, err = c.store.Msgs.Store(msg); err != nil {
goto FATAL_ERROR
}
}
return c.store.Msgs.Flush()
FATAL_ERROR:
panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err))
case spb.RaftOperation_Connect:
// Client connection create replication.
return s.processConnect(op.ClientConnect.Request, op.ClientConnect.Refresh)
Expand All @@ -555,7 +577,15 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
return s.closeClient(op.ClientDisconnect.ClientID)
case spb.RaftOperation_Subscribe:
// Subscription replication.
sub, err := s.processSub(nil, op.Sub.Request, op.Sub.AckInbox, op.Sub.ID)
c, err := r.lookupOrCreateChannel(op.Sub.Request.Subject, op.ChannelUID, onlyIfNotExist)
if c == nil && err == nil {
err = fmt.Errorf("unable to process subscription on channel %q, wrong UID %q",
op.Sub.Request.Subject, op.ChannelUID)
}
if err != nil {
return &replicatedSub{sub: nil, err: err}
}
sub, err := s.processSub(c, op.Sub.Request, op.Sub.AckInbox, op.Sub.ID)
return &replicatedSub{sub: sub, err: err}
case spb.RaftOperation_RemoveSubscription:
fallthrough
Expand All @@ -572,9 +602,83 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
}
return nil
case spb.RaftOperation_DeleteChannel:
// Delete only if the channel exists and has the same UID.
if r.lookupChannel(op.Channel, op.ChannelUID) == nil {
return nil
}
s.processDeleteChannel(op.Channel)
return nil
default:
panic(fmt.Sprintf("unknown op type %s", op.OpType))
}
}

// This returns a channel object only if it is found with the proper UID.
func (r *raftFSM) lookupChannel(name, uid string) *channel {
s := r.server
cs := s.channels
cs.RLock()
defer cs.RUnlock()

c := cs.channels[name]
// Consider no UID (either in channel or from param) to be a match.
if c != nil && (uid == "" || c.uid == "" || c.uid == uid) {
return c
}
// No channel, or wrong UID
return nil
}

const (
replaceChannel = 1
onlyIfNotExist = 2
)

// Returns the channel with this name and UID.
//
// If the channel exists but has a different UID, behavior is altered by `mode`:
// `replaceChannel`: means that the current channel will be deleted before
// being created with given UID.
// `onlyIfNotExist`: means that `nil` will be returned (to indicate that the
// channel does not match.
// If the channel does not exist, the channel is created (the `mode` does not
// come into play).
//
// Note that to support existing streaming and RAFT stores, the given `uid` may
// be empty when processing existing RAFT snapshots/logs, or the streaming
// channel may not have an UID. In any of those cases, the existing channel
// object is returned.
func (r *raftFSM) lookupOrCreateChannel(name, uid string, mode int) (*channel, error) {
s := r.server
cs := s.channels
cs.Lock()
defer cs.Unlock()

c := cs.channels[name]
if c != nil {
// Consider no UID (either in channel or from param) to be a match.
if uid == "" || c.uid == "" || c.uid == uid {
return c, nil
}
// This is a no match, so unless allowed to replace, return nil here.
if mode != replaceChannel {
return nil, nil
}
// Delete current channel from streaming and RAFT stores.
err := cs.store.DeleteChannel(name)
if err == nil {
err = s.raft.store.DeleteChannelUID(name)
}
if err != nil {
s.log.Errorf("Error deleting channel %q: %v", name, err)
if s.isLeader() && c.activity != nil {
c.activity.deleteInProgress = false
c.startDeleteTimer()
}
return nil, err
}
delete(cs.channels, name)
}
// Channel does exist or has been deleted. Create now with given UID.
return cs.createChannelLocked(s, name, uid)
}

0 comments on commit bff9895

Please sign in to comment.