Skip to content

Commit

Permalink
[FIXED] Cluster: maintain sub's ID on replay
Browse files Browse the repository at this point in the history
When a server in the cluster restarts and replays the subscription
create event, a new subscription ID was assigned to it. It is not
a problem per-se, but could make debugging using logs a bit more
challenging. Ensure that we keep the same subscription ID for
a given subscription regardless of cluster restart/snapshot.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 15, 2019
1 parent 766681d commit d7a392c
Show file tree
Hide file tree
Showing 7 changed files with 869 additions and 591 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -15,11 +15,12 @@ services:
- mysql
before_script:
- EXCLUDE_VENDOR=$(go list ./... | grep -v "/vendor/")
- EXCLUDE_VENDOR_AND_PROTO_DIR=$(go list ./... | grep -v "/vendor/" | grep -v "/spb")
- go install
- $(exit $(go fmt $EXCLUDE_VENDOR | wc -l))
- go vet $EXCLUDE_VENDOR
- $(exit $(misspell -locale US . | grep -v "vendor/" | wc -l))
- staticcheck -ignore "$(cat staticcheck.ignore)" $EXCLUDE_VENDOR
- staticcheck -ignore "$(cat staticcheck.ignore)" $EXCLUDE_VENDOR_AND_PROTO_DIR
script:
- mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;"
- go test -i $EXCLUDE_VENDOR
Expand Down
2 changes: 1 addition & 1 deletion server/clustering.go
Expand Up @@ -495,7 +495,7 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
return s.closeClient(op.ClientDisconnect.ClientID)
case spb.RaftOperation_Subscribe:
// Subscription replication.
sub, err := s.processSub(nil, op.Sub.Request, op.Sub.AckInbox)
sub, err := s.processSub(nil, op.Sub.Request, op.Sub.AckInbox, op.Sub.ID)
return &replicatedSub{sub: sub, err: err}
case spb.RaftOperation_RemoveSubscription:
fallthrough
Expand Down
162 changes: 162 additions & 0 deletions server/clustering_test.go
Expand Up @@ -4531,3 +4531,165 @@ func TestClusteringInstallSnapshotFailure(t *testing.T) {
s5.Shutdown()
remaining.Shutdown()
}

func TestClusteringKeepSubIDOnReplay(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

leader := getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

waitForNumSubs(t, leader, clientName, 2)

subs := leader.clients.getSubs(clientName)

subsMap := map[uint64]string{}
for _, sub := range subs {
sub.RLock()
subsMap[sub.ID] = sub.Inbox
sub.RUnlock()
}

// Shutdown the cluster and restart it.
s2.Shutdown()
s1.Shutdown()

s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()
s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

leader = getLeader(t, 10*time.Second, s1, s2)

checkSubIDsAfterRestart := func(t *testing.T, leader *StanServer) {
t.Helper()
subs = leader.clients.getSubs(clientName)
for _, sub := range subs {
sub.RLock()
id := sub.ID
ibx := sub.Inbox
sub.RUnlock()

mibx, ok := subsMap[id]
if !ok {
t.Fatalf("Sub.ID %v is new", id)
} else {
if ibx != mibx {
t.Fatalf("Sub.ID %v's inbox should be %v, got %v", id, mibx, ibx)
}
}
}
}
checkSubIDsAfterRestart(t, leader)

// Create a new subscription, ensure it does not reuse same subID.
sub3, err := sc.Subscribe("foo", func(_ *stan.Msg) {})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
subs = leader.clients.getSubs(clientName)
var newID uint64
var maxSubID uint64
for _, sub := range subs {
sub.RLock()
id := sub.ID
ibx := sub.Inbox
sub.RUnlock()

mibx, ok := subsMap[id]
if ok {
if ibx != mibx {
t.Fatalf("Sub.ID %v's inbox should be %v, got %v", id, mibx, ibx)
}
if id > maxSubID {
maxSubID = id
}
} else {
newID = id
}
}
if newID <= maxSubID {
t.Fatalf("Max subID for existing subscriptions was %v, new ID is: %v", maxSubID, newID)
}
sub3.Close()

waitForNumSubs(t, leader, clientName, 2)

if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}
if err := s2.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

// Shutdown the cluster and restart it.
s2.Shutdown()
s1.Shutdown()

s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()
s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

leader = getLeader(t, 10*time.Second, s1, s2)
checkSubIDsAfterRestart(t, leader)

// During snapshot, we should have stored the max sub ID, so we should not
// be reusing sub3's ID.
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
subs = leader.clients.getSubs(clientName)
var newNewID uint64
maxSubID = 0
for _, sub := range subs {
sub.RLock()
id := sub.ID
ibx := sub.Inbox
sub.RUnlock()

mibx, ok := subsMap[id]
if ok {
if ibx != mibx {
t.Fatalf("Sub.ID %v's inbox should be %v, got %v", id, mibx, ibx)
}
if id > maxSubID {
maxSubID = id
}
} else {
newNewID = id
}
}
if newNewID <= maxSubID {
t.Fatalf("Max subID for existing subscriptions was %v, new ID is: %v", maxSubID, newID)
}
if newNewID <= newID {
t.Fatalf("subID is less or equal to the last deleted subscription prev=%v last=%v", newID, newNewID)
}
sc.Close()
}
38 changes: 32 additions & 6 deletions server/server.go
Expand Up @@ -337,7 +337,7 @@ func (cs *channelStore) createChannelLocked(s *StanServer, name string) (*channe
// low-level creation and storage in memory of a *channel
// Lock is held on entry or not needed.
func (cs *channelStore) create(s *StanServer, name string, sc *stores.Channel) (*channel, error) {
c := &channel{name: name, store: sc, ss: s.createSubStore(), stan: s}
c := &channel{name: name, store: sc, ss: s.createSubStore(), stan: s, nextSubID: 1}
lastSequence, err := c.store.Msgs.LastSequence()
if err != nil {
return nil, err
Expand Down Expand Up @@ -429,6 +429,7 @@ type channel struct {
lTimestamp int64
stan *StanServer
activity *channelActivity
nextSubID uint64
}

type channelActivity struct {
Expand Down Expand Up @@ -803,8 +804,18 @@ func (ss *subStore) Store(sub *subState) error {
// Adds to storage.
// Use sub lock to avoid race with waitForAcks in some tests
sub.Lock()
// In cluster mode (after 0.12.2), we need to set the sub.ID to
// what is set prior to the call (overwrite anything that is set
// by the backend store).
subID := sub.ID
err := sub.store.CreateSub(&sub.SubState)
if err == nil && subID > 0 {
sub.ID = subID
}
sub.Unlock()
if err == nil {
err = sub.store.Flush()
}
if err != nil {
ss.stan.log.Errorf("Unable to store subscription [%v:%v] on [%s]: %v", sub.ClientID, sub.Inbox, sub.subject, err)
return err
Expand Down Expand Up @@ -4313,12 +4324,13 @@ func durableKey(sr *pb.SubscriptionRequest) string {

// replicateSub replicates the SubscriptionRequest to nodes in the cluster via
// Raft.
func (s *StanServer) replicateSub(sr *pb.SubscriptionRequest, ackInbox string) (*subState, error) {
func (s *StanServer) replicateSub(sr *pb.SubscriptionRequest, ackInbox string, subID uint64) (*subState, error) {
op := &spb.RaftOperation{
OpType: spb.RaftOperation_Subscribe,
Sub: &spb.AddSubscription{
Request: sr,
AckInbox: ackInbox,
ID: subID,
},
}
data, err := op.Marshal()
Expand Down Expand Up @@ -4377,7 +4389,7 @@ func (s *StanServer) updateDurable(ss *subStore, sub *subState) error {
}

// processSub adds the subscription to the server.
func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox string) (*subState, error) {
func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox string, subID uint64) (*subState, error) {
// If channel not provided, we have to look it up
var err error
if c == nil {
Expand Down Expand Up @@ -4504,8 +4516,19 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
}

if err == nil {
// add the subscription to stan
// add the subscription to stan.
// In cluster mode, the server decides of the subscription ID
// (so that subscriptions have the same ID on replay). So
// set it prior to this call.
sub.ID = subID
err = s.addSubscription(ss, sub)
if err == nil && subID > 0 {
ss.Lock()
if subID >= c.nextSubID {
c.nextSubID = subID + 1
}
ss.Unlock()
}
}
}
if err == nil && (!s.isClustered || s.isLeader()) {
Expand Down Expand Up @@ -4644,10 +4667,13 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
}
}
if err == nil {
sub, err = s.replicateSub(sr, ackInbox)
c.ss.Lock()
subID := c.nextSubID
c.ss.Unlock()
sub, err = s.replicateSub(sr, ackInbox, subID)
}
} else {
sub, err = s.processSub(c, sr, ackInbox)
sub, err = s.processSub(c, sr, ackInbox, 0)
}
}
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions server/snapshot.go
Expand Up @@ -146,12 +146,13 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error {
if err != nil {
return err
}
c.ss.RLock()
snapChannel := &spb.ChannelSnapshot{
Channel: c.name,
First: first,
Last: last,
Channel: c.name,
First: first,
Last: last,
NextSubID: c.nextSubID,
}
c.ss.RLock()

// Start with count of all plain subs...
snapSubs := make([]*spb.SubscriptionSnapshot, len(c.ss.psubs))
Expand Down Expand Up @@ -324,7 +325,17 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe
}
for _, ss := range sc.Subscriptions {
s.recoverOneSub(c, ss.State, nil, ss.AcksPending)
c.ss.Lock()
if ss.State.ID >= c.nextSubID {
c.nextSubID = ss.State.ID + 1
}
c.ss.Unlock()
}
c.ss.Lock()
if sc.NextSubID > c.nextSubID {
c.nextSubID = sc.NextSubID
}
c.ss.Unlock()
}
if !inNewRaftCall {
// Now delete channels that we had before the restore.
Expand Down

0 comments on commit d7a392c

Please sign in to comment.