Skip to content

Commit

Permalink
Merge be195d3 into 4ddbef0
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jan 5, 2021
2 parents 4ddbef0 + be195d3 commit fa4ade9
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 0 deletions.
50 changes: 50 additions & 0 deletions server/clustering_test.go
Expand Up @@ -8031,3 +8031,53 @@ func TestClusteringAddRemoveClusterNodesWithBootstrap(t *testing.T) {
}
}
}

func TestClusteringDurableReplaced(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ReplaceDurable = true
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Wait for it to bootstrap
getLeader(t, 10*time.Second, s1)

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ReplaceDurable = true
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ReplaceDurable = true
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

testDurableReplaced(t, s1)

s1.Shutdown()
newLeader := getLeader(t, 2*time.Second, s2, s3)
c, err := newLeader.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error looking up channel: %v", err)
}
if subs := c.ss.getAllSubs(); len(subs) != 0 {
t.Fatalf("Expected 0 sub, got %v", len(subs))
}
c.ss.RLock()
lenDur := len(c.ss.durables)
c.ss.RUnlock()
if lenDur != 1 {
t.Fatalf("Expected 1 durable, got %v", lenDur)
}
}
5 changes: 5 additions & 0 deletions server/conf.go
Expand Up @@ -199,6 +199,11 @@ func ProcessConfigFile(configFile string, opts *Options) error {
return err
}
opts.NKeySeedFile = v.(string)
case "replace_durable", "replace_durables", "replace_duplicate_durable", "replace_duplicate_durables":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.ReplaceDurable = v.(bool)
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions server/conf_test.go
Expand Up @@ -440,6 +440,7 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "ft_group: 123", wrongTypeErr)
expectFailureFor(t, "partitioning: 123", wrongTypeErr)
expectFailureFor(t, "syslog_name: 123", wrongTypeErr)
expectFailureFor(t, "replace_durable: 123", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_channels:false}", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_msgs:false}", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_bytes:false}", wrongTypeErr)
Expand Down
33 changes: 33 additions & 0 deletions server/partitions_test.go
Expand Up @@ -1027,3 +1027,36 @@ func TestPartitionsCleanInvalidConns(t *testing.T) {
t.Fatalf("Should not be more than %v, got %v", maxKnownInvalidConns, mlen)
}
}

func TestPartitionsDurableReplaced(t *testing.T) {
setPartitionsVarsForTest()
defer resetDefaultPartitionsVars()

clientCheckTimeout = 150 * time.Millisecond
defer func() { clientCheckTimeout = defaultClientCheckTimeout }()

// For this test, create a single NATS server to which both servers connect to.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

fooSubj := "foo"
barSubj := "bar"

opts1 := GetDefaultOptions()
opts1.NATSServerURL = "nats://127.0.0.1:4222"
opts1.Partitioning = true
opts1.ReplaceDurable = true
opts1.StoreLimits.AddPerChannel(fooSubj, &stores.ChannelLimits{})
s1 := runServerWithOpts(t, opts1, nil)
defer s1.Shutdown()

opts2 := GetDefaultOptions()
opts2.NATSServerURL = "nats://127.0.0.1:4222"
opts2.Partitioning = true
opts2.ReplaceDurable = true
opts2.StoreLimits.AddPerChannel(barSubj, &stores.ChannelLimits{})
s2 := runServerWithOpts(t, opts2, nil)
defer s2.Shutdown()

testDurableReplaced(t, s1)
}
47 changes: 47 additions & 0 deletions server/server.go
Expand Up @@ -1337,6 +1337,7 @@ type Options struct {
EncryptionKey []byte // Encryption key. The environment NATS_STREAMING_ENCRYPTION_KEY takes precedence and is the preferred way to provide the key.
Clustering ClusteringOptions
NATSClientOpts []nats.Option
ReplaceDurable bool // If true, the subscription request for a durable subscription will replace the current durable instead of failing with duplicate durable error.
}

// Clone returns a deep copy of the Options object.
Expand Down Expand Up @@ -5109,6 +5110,11 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
// until we are done with this subscription. This will also stop
// the delete timer if one was set.
c, preventDelete, err := s.lookupOrCreateChannelPreventDelete(sr.Subject)
// For durable subscriptions, and if allowed, if this is going to be a
// duplicate, close the current durable and accept the new one.
if err == nil && sr.DurableName != "" && sr.QGroup == "" && s.opts.ReplaceDurable {
err = s.closeDurableIfDuplicate(c, sr)
}
if err == nil {
// If clustered, thread operations through Raft.
if s.isClustered {
Expand Down Expand Up @@ -5173,6 +5179,47 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
s.subStartCh <- &subStartInfo{c: c, sub: sub, qs: qs, isDurable: sub.IsDurable}
}

// This will close (and replicate the close operation if running in cluster mode)
// the current durable subscription matching this subscription request information.
// This should be invoked only if ReplaceDurable option is enabled.
// It is used in case users want to be able to "resend" a subscription request
// if the original request failed, due to timeout for instance. It could be that
// the server accepted the original, sent the response back but the client library
// gave up on it due to its own timeout. In that case, trying to issue the same
// subscription request would lead to a "duplicate durable" error and the only choice
// would be to close the connection.
func (s *StanServer) closeDurableIfDuplicate(c *channel, sr *pb.SubscriptionRequest) error {
var duplicate bool
var ackInbox string
ss := c.ss
ss.RLock()
sub := ss.durables[durableKey(sr)]
if sub != nil {
sub.RLock()
duplicate = sub.ClientID != ""
ackInbox = sub.AckInbox
sub.RUnlock()
}
ss.RUnlock()
if !duplicate {
return nil
}
creq := &pb.UnsubscribeRequest{
ClientID: sr.ClientID,
Subject: sr.Subject,
Inbox: ackInbox,
}
var err error
if s.isClustered {
err = s.replicateCloseSubscription(creq)
} else {
s.closeMu.Lock()
err = s.unsubscribe(creq, true)
s.closeMu.Unlock()
}
return err
}

type subStateTraceCtx struct {
clientID string
isRemove bool
Expand Down
51 changes: 51 additions & 0 deletions server/server_durable_test.go
Expand Up @@ -16,6 +16,7 @@ package server
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -918,3 +919,53 @@ func TestPersistentStoreDurableClosedStatusOnRestart(t *testing.T) {
// Restart one last time
dur = restartDurable()
}

func TestDurableReplaced(t *testing.T) {
opts := GetDefaultOptions()
opts.ReplaceDurable = true
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

testDurableReplaced(t, s)
}

func testDurableReplaced(t *testing.T, s *StanServer) {
sc := NewDefaultConnection(t)
defer sc.Close()

count := int32(0)
cb := func(_ *stan.Msg) {
atomic.AddInt32(&count, 1)
}
dur1, err := sc.Subscribe("foo", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Unable to start durable: %v", err)
}

dur2, err := sc.Subscribe("foo", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Unable to start durable: %v", err)
}

sc.Publish("foo", []byte("msg"))
time.Sleep(150 * time.Millisecond)

if c := atomic.LoadInt32(&count); c != 1 {
t.Fatalf("Received %v messages!", c)
}

c, err := s.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error on lookup: %v", err)
}
if subs := c.ss.getAllSubs(); len(subs) > 1 {
t.Fatalf("Should have only 1 sub, got %v", len(subs))
}

if err := dur1.Close(); err == nil {
t.Fatal("Expected error on close of dur1, but did not get one")
}
if err := dur2.Close(); err != nil {
t.Fatalf("Error on close: %v", err)
}
}

0 comments on commit fa4ade9

Please sign in to comment.