Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make requirement for interest on creation of ephemerals soft vs hard. #2347

Merged
merged 5 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 2 additions & 8 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Check in place here for interest. Will setup properly in setLeader.
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
// Directs can let the interest come to us eventually, but setup delete timer.
if config.Direct {
o.updateDeliveryInterest(false)
} else {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, errNoInterest
}
// Let the interest come to us eventually, but setup delete timer.
o.updateDeliveryInterest(false)
}
}
}
Expand Down
149 changes: 82 additions & 67 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,89 +329,50 @@ func TestJetStreamClusterDelete(t *testing.T) {
c := createJetStreamClusterExplicit(t, "RNS", 3)
defer c.shutdown()

s := c.randomServer()
// Client for API requests.
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

cfg := StreamConfig{
cfg := &nats.StreamConfig{
Name: "C22",
Subjects: []string{"foo", "bar", "baz"},
Replicas: 2,
Storage: FileStorage,
Storage: nats.FileStorage,
MaxMsgs: 100,
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var scResp JSApiStreamCreateResponse
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.StreamInfo == nil || scResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// Now create a consumer.
obsReq := CreateConsumerRequest{
Stream: cfg.Name,
Config: ConsumerConfig{Durable: "dlc", AckPolicy: AckExplicit},
}
req, err = json.Marshal(obsReq)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc.Request(fmt.Sprintf(JSApiDurableCreateT, cfg.Name, "dlc"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var ccResp JSApiConsumerCreateResponse
if err = json.Unmarshal(resp.Data, &ccResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ccResp.ConsumerInfo == nil || ccResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", ccResp.Error)
if _, err := js.AddConsumer("C22", &nats.ConsumerConfig{
Durable: "dlc",
AckPolicy: nats.AckExplicitPolicy,
}); err != nil {
t.Fatalf("Error adding consumer: %v", err)
}

// Now delete the consumer.
resp, _ = nc.Request(fmt.Sprintf(JSApiConsumerDeleteT, cfg.Name, "dlc"), nil, time.Second)
var cdResp JSApiConsumerDeleteResponse
if err = json.Unmarshal(resp.Data, &cdResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !cdResp.Success || cdResp.Error != nil {
t.Fatalf("Got a bad response %+v", cdResp)
if err := js.DeleteConsumer("C22", "dlc"); err != nil {
t.Fatalf("Error deleting consumer: %v", err)
}

// Now delete the stream.
resp, err = nc.Request(fmt.Sprintf(JSApiStreamDeleteT, cfg.Name), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var dResp JSApiStreamDeleteResponse
if err = json.Unmarshal(resp.Data, &dResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !dResp.Success || dResp.Error != nil {
t.Fatalf("Got a bad response %+v", dResp.Error)
if err := js.DeleteStream("C22"); err != nil {
t.Fatalf("Error deleting stream: %v", err)
}

// This will get the current information about usage and limits for this account.
resp, err = nc.Request(JSApiAccountInfo, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Streams != 0 {
t.Fatalf("Expected no remaining streams, got %d", info.Streams)
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
info, err := js.AccountInfo()
if err != nil {
return err
}
if info.Streams != 0 {
return fmt.Errorf("Expected no remaining streams, got %d", info.Streams)
}
return nil
})
}

func TestJetStreamClusterStreamPurge(t *testing.T) {
Expand Down Expand Up @@ -2351,6 +2312,47 @@ func TestJetStreamClusterInterestRetentionWithFilteredConsumers(t *testing.T) {
checkState(0)
}

func TestJetStreamClusterEphemeralConsumerNoImmediateInterest(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// We want to relax the strict interest requirement.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{DeliverSubject: "r"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

cl := c.consumerLeader("$G", "TEST", ci.Name)
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Expected to find a stream for %q", "TEST")
}
o := mset.lookupConsumer(ci.Name)
if o == nil {
t.Fatalf("Error looking up consumer %q", ci.Name)
}
o.setInActiveDeleteThreshold(500 * time.Millisecond)

// Make sure the consumer goes away though eventually.
// Should be 5 seconds wait.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
if _, err := js.ConsumerInfo("TEST", ci.Name); err != nil {
return nil
}
return fmt.Errorf("Consumer still present")
})
}

func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down Expand Up @@ -3876,7 +3878,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
}

checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
Expand All @@ -3901,7 +3903,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {

// Now check consumer info as well.
checkFor(t, 30*time.Second, 100*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "cat")
ci, err := js.ConsumerInfo("TEST", "cat", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
}
Expand Down Expand Up @@ -5108,6 +5110,17 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
js.Publish("TEST", []byte("Second"))
js.Publish("TEST", []byte("Third"))

checkFor(t, time.Second, 15*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "tr")
if err != nil {
return fmt.Errorf("Error getting consumer info: %v", err)
}
if ci.NumPending != 2 {
return fmt.Errorf("NumPending still not 1: %v", ci.NumPending)
}
return nil
})

// Ack across accounts.
m, err = nc.Request("$JS.API.CONSUMER.MSG.NEXT.TEST.tr", []byte("+NXT"), 2*time.Second)
if err != nil {
Expand Down Expand Up @@ -6903,7 +6916,9 @@ func TestJetStreamClusterMirrorAndSourceSubLeaks(t *testing.T) {
}

// Some subs take longer to settle out so we give ourselves a small buffer.
if deleteSubs := c.stableTotalSubs(); deleteSubs > startSubs+10 {
// There will be 1 sub for client on each server (such as _INBOX.IvVJ2DOXUotn4RUSZZCFvp.*)
// and 2 or 3 subs such as `_R_.xxxxx.>` on each server, so a total of 12 subs.
if deleteSubs := c.stableTotalSubs(); deleteSubs > startSubs+12 {
t.Fatalf("Expected subs to return to %d from a high of %d, but got %d", startSubs, afterSubs, deleteSubs)
}
}
Expand Down
23 changes: 5 additions & 18 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,10 +1208,6 @@ func TestJetStreamCreateConsumer(t *testing.T) {

// Non-Durables need to have subscription to delivery subject.
delivery := nats.NewInbox()
if _, err := mset.addConsumer(&ConsumerConfig{DeliverSubject: delivery}); err == nil {
t.Fatalf("Expected an error on unsubscribed delivery subject")
}

// Pull-based consumers are required to be durable since we do not know when they should
// be cleaned up.
if _, err := mset.addConsumer(&ConsumerConfig{AckPolicy: AckExplicit}); err == nil {
Expand Down Expand Up @@ -7599,27 +7595,18 @@ func TestJetStreamRequestAPI(t *testing.T) {
if err = json.Unmarshal(resp.Data, &ccResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkNatsError(t, ccResp.Error, JSConsumerCreateErrF)

// Now create subscription and make sure we get proper response.
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()

resp, err = nc.Request(fmt.Sprintf(JSApiConsumerCreateT, msetCfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ccResp.Error, ccResp.ConsumerInfo = nil, nil
if err = json.Unmarshal(resp.Data, &ccResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Ephemerals are now not rejected when there is no interest.
if ccResp.ConsumerInfo == nil || ccResp.Error != nil {
t.Fatalf("Got a bad response %+v", ccResp)
}
if time.Since(ccResp.Created) > time.Second {
t.Fatalf("Created time seems wrong: %v\n", ccResp.Created)
}

// Now create subscription and make sure we get proper response.
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()

checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
Expand Down