Skip to content

Commit

Permalink
Merge 8ceaf28 into 2e8a792
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Apr 24, 2019
2 parents 2e8a792 + 8ceaf28 commit eb2ad6a
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 20 deletions.
58 changes: 58 additions & 0 deletions server/clustering_test.go
Expand Up @@ -5001,3 +5001,61 @@ func TestClusteringKeepSubIDOnReplay(t *testing.T) {
}
sc.Close()
}

func TestClusteringNoIncorrectMaxSubs(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.MaxSubscriptions = 2
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.MaxSubscriptions = 2
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

sc.Close()

s1.Shutdown()
s2.Shutdown()

s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc = NewDefaultConnection(t)
defer sc.Close()

if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

sc.Close()
s1.Shutdown()
}
13 changes: 2 additions & 11 deletions server/server.go
Expand Up @@ -47,7 +47,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.14.0"
VERSION = "0.14.1"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -803,20 +803,11 @@ func (ss *subStore) Store(sub *subState) error {
if sub == nil {
return nil
}
// `sub` has just been created and can't be referenced anywhere else in
// the code, so we don't need locking.

// Adds to storage.
// Use sub lock to avoid race with waitForAcks in some tests
sub.Lock()
// In cluster mode (after 0.12.2), we need to set the sub.ID to
// what is set prior to the call (overwrite anything that is set
// by the backend store).
subID := sub.ID
err := sub.store.CreateSub(&sub.SubState)
if err == nil && subID > 0 {
sub.ID = subID
}
sub.Unlock()
if err == nil {
err = sub.store.Flush()
Expand Down Expand Up @@ -1567,7 +1558,7 @@ func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer *
// Wrap our store with a RaftStore instance that avoids persisting
// data that we don't need because they are handled by the actual
// raft logs.
store = stores.NewRaftStore(store)
store = stores.NewRaftStore(s.log, store, storeLimits)
}
if sOpts.Encrypt || len(sOpts.EncryptionKey) > 0 {
// In clustering mode, RAFT is using its own logs (not the one above),
Expand Down
6 changes: 6 additions & 0 deletions stores/common_sub_test.go
Expand Up @@ -55,6 +55,7 @@ func TestCSMaxSubs(t *testing.T) {
var err error
lastSubID := uint64(0)
for i := 0; i < total; i++ {
sub.ID = 0
err = cs.Subs.CreateSub(sub)
if err != nil {
break
Expand All @@ -80,10 +81,12 @@ func TestCSMaxSubs(t *testing.T) {
}
// Now try to add back 2 subscriptions...
// First should be fine
sub.ID = 0
if err := cs.Subs.CreateSub(sub); err != nil {
t.Fatalf("Error on create: %v", err)
}
// This one should fail:
sub.ID = 0
if err := cs.Subs.CreateSub(sub); err == nil || err != ErrTooManySubs {
t.Fatalf("Error should have been ErrTooManySubs, got %v", err)
}
Expand Down Expand Up @@ -145,6 +148,7 @@ func TestCSBasicSubStore(t *testing.T) {
}

// Create a new subscription, make sure subID is not reused
sub.ID = 0
err = ss.CreateSub(sub)
if err != nil {
t.Fatalf("Error on create sub: %v", err)
Expand All @@ -155,6 +159,7 @@ func TestCSBasicSubStore(t *testing.T) {
subToDelete := sub.ID
subID = sub.ID
// Create another
sub.ID = 0
err = ss.CreateSub(sub)
if err != nil {
t.Fatalf("Error on create sub: %v", err)
Expand All @@ -169,6 +174,7 @@ func TestCSBasicSubStore(t *testing.T) {
}
// Create a last one and make sure it does not collide with the
// second sub we created.
sub.ID = 0
err = ss.CreateSub(sub)
if err != nil {
t.Fatalf("Error on create sub: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion stores/common_test.go
Expand Up @@ -519,7 +519,7 @@ func TestCSInit(t *testing.T) {
case TypeRaft:
s, err = NewFileStore(testLogger, testRSDefaultDatastore, nil)
if err == nil {
s = NewRaftStore(s)
s = NewRaftStore(testLogger, s, nil)
}
default:
panic(fmt.Errorf("Add store type %q in this test", st.name))
Expand Down
72 changes: 67 additions & 5 deletions stores/raftstore.go
Expand Up @@ -16,6 +16,7 @@ package stores
import (
"sync"

"github.com/nats-io/nats-streaming-server/logger"
"github.com/nats-io/nats-streaming-server/spb"
)

Expand All @@ -27,16 +28,17 @@ import (
type RaftStore struct {
sync.Mutex
Store
log logger.Logger
}

// RaftSubStore implements the SubStore interface
type RaftSubStore struct {
SubStore
genericSubStore
}

// NewRaftStore returns an instarce of a RaftStore
func NewRaftStore(s Store) *RaftStore {
return &RaftStore{Store: s}
func NewRaftStore(log logger.Logger, s Store, limits *StoreLimits) *RaftStore {
return &RaftStore{Store: s, log: log}
}

////////////////////////////////////////////////////////////////////////////
Expand All @@ -51,10 +53,22 @@ func (s *RaftStore) CreateChannel(channel string) (*Channel, error) {
if err != nil {
return nil, err
}
c.Subs = &RaftSubStore{SubStore: c.Subs}
c.Subs = s.replaceSubStore(channel, c.Subs, 0)
return c, nil
}

func (s *RaftStore) replaceSubStore(channel string, realSubStore SubStore, maxSubID uint64) *RaftSubStore {
// Close underlying sub store.
realSubStore.Close()
// We need the subs limits for this channel
cl := s.Store.GetChannelLimits(channel)
// Create and initialize our sub store.
rss := &RaftSubStore{}
rss.init(s.log, &cl.SubStoreLimits)
rss.maxSubID = maxSubID
return rss
}

// Name implements the Store interface
func (s *RaftStore) Name() string {
return TypeRaft + "_" + s.Store.Name()
Expand All @@ -69,7 +83,16 @@ func (s *RaftStore) Recover() (*RecoveredState, error) {
return nil, err
}
if state != nil {
for _, rc := range state.Channels {
for channel, rc := range state.Channels {
// Note that this is when recovering the underlying sub store
// that would be the case for a RaftSubStore prior to 0.14.1
var maxSubID uint64
for _, rs := range rc.Subscriptions {
if rs.Sub.ID > maxSubID {
maxSubID = rs.Sub.ID
}
}
rc.Channel.Subs = s.replaceSubStore(channel, rc.Channel.Subs, maxSubID)
rc.Subscriptions = nil
}
state.Clients = nil
Expand All @@ -93,6 +116,45 @@ func (s *RaftStore) DeleteClient(clientID string) error {
// RaftSubStore methods
////////////////////////////////////////////////////////////////////////////

// CreateSub implements the SubStore interface
func (ss *RaftSubStore) CreateSub(sub *spb.SubState) error {
gss := &ss.genericSubStore

gss.Lock()
defer gss.Unlock()

// This store does not persist subscriptions, since it is done
// in the actual RAFT log. This is just a wrapper to the streaming
// sub store. We still need to apply limits.

// If sub.ID is provided, check if already present, in which case
// don't check limit.
if sub.ID > 0 {
if _, ok := gss.subs[sub.ID]; ok {
return nil
}
}
// Check limits
if gss.limits.MaxSubscriptions > 0 && len(gss.subs) >= gss.limits.MaxSubscriptions {
return ErrTooManySubs
}

// With new server, the sub.ID is set before this call is invoked,
// and if that is the case, this is what we use. But let's support
// not having one (in case we recover an existing store, or run a
// mix of servers with different versions where the leader would
// not be at a version that sets the sub.ID).
if sub.ID == 0 {
gss.maxSubID++
sub.ID = gss.maxSubID
} else if sub.ID > gss.maxSubID {
gss.maxSubID = sub.ID
}
gss.subs[sub.ID] = emptySub

return nil
}

// UpdateSub implements the SubStore interface
func (ss *RaftSubStore) UpdateSub(*spb.SubState) error {
// Make this a no-op
Expand Down

0 comments on commit eb2ad6a

Please sign in to comment.