Skip to content

Commit

Permalink
Merge pull request #2347 from nats-io/econsumer
Browse files Browse the repository at this point in the history
Make requirement for interest on creation of ephemerals soft vs hard.
  • Loading branch information
kozlovic committed Jul 6, 2021
2 parents eca1629 + b582b13 commit 315e658
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 93 deletions.
10 changes: 2 additions & 8 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Check in place here for interest. Will setup properly in setLeader.
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
// Directs can let the interest come to us eventually, but setup delete timer.
if config.Direct {
o.updateDeliveryInterest(false)
} else {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, errNoInterest
}
// Let the interest come to us eventually, but setup delete timer.
o.updateDeliveryInterest(false)
}
}
}
Expand Down
149 changes: 82 additions & 67 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,89 +329,50 @@ func TestJetStreamClusterDelete(t *testing.T) {
c := createJetStreamClusterExplicit(t, "RNS", 3)
defer c.shutdown()

s := c.randomServer()
// Client for API requests.
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

cfg := StreamConfig{
cfg := &nats.StreamConfig{
Name: "C22",
Subjects: []string{"foo", "bar", "baz"},
Replicas: 2,
Storage: FileStorage,
Storage: nats.FileStorage,
MaxMsgs: 100,
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var scResp JSApiStreamCreateResponse
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.StreamInfo == nil || scResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// Now create a consumer.
obsReq := CreateConsumerRequest{
Stream: cfg.Name,
Config: ConsumerConfig{Durable: "dlc", AckPolicy: AckExplicit},
}
req, err = json.Marshal(obsReq)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc.Request(fmt.Sprintf(JSApiDurableCreateT, cfg.Name, "dlc"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var ccResp JSApiConsumerCreateResponse
if err = json.Unmarshal(resp.Data, &ccResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ccResp.ConsumerInfo == nil || ccResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", ccResp.Error)
if _, err := js.AddConsumer("C22", &nats.ConsumerConfig{
Durable: "dlc",
AckPolicy: nats.AckExplicitPolicy,
}); err != nil {
t.Fatalf("Error adding consumer: %v", err)
}

// Now delete the consumer.
resp, _ = nc.Request(fmt.Sprintf(JSApiConsumerDeleteT, cfg.Name, "dlc"), nil, time.Second)
var cdResp JSApiConsumerDeleteResponse
if err = json.Unmarshal(resp.Data, &cdResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !cdResp.Success || cdResp.Error != nil {
t.Fatalf("Got a bad response %+v", cdResp)
if err := js.DeleteConsumer("C22", "dlc"); err != nil {
t.Fatalf("Error deleting consumer: %v", err)
}

// Now delete the stream.
resp, err = nc.Request(fmt.Sprintf(JSApiStreamDeleteT, cfg.Name), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var dResp JSApiStreamDeleteResponse
if err = json.Unmarshal(resp.Data, &dResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !dResp.Success || dResp.Error != nil {
t.Fatalf("Got a bad response %+v", dResp.Error)
if err := js.DeleteStream("C22"); err != nil {
t.Fatalf("Error deleting stream: %v", err)
}

// This will get the current information about usage and limits for this account.
resp, err = nc.Request(JSApiAccountInfo, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Streams != 0 {
t.Fatalf("Expected no remaining streams, got %d", info.Streams)
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
info, err := js.AccountInfo()
if err != nil {
return err
}
if info.Streams != 0 {
return fmt.Errorf("Expected no remaining streams, got %d", info.Streams)
}
return nil
})
}

func TestJetStreamClusterStreamPurge(t *testing.T) {
Expand Down Expand Up @@ -2351,6 +2312,47 @@ func TestJetStreamClusterInterestRetentionWithFilteredConsumers(t *testing.T) {
checkState(0)
}

func TestJetStreamClusterEphemeralConsumerNoImmediateInterest(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// We want to relax the strict interest requirement.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{DeliverSubject: "r"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

cl := c.consumerLeader("$G", "TEST", ci.Name)
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Expected to find a stream for %q", "TEST")
}
o := mset.lookupConsumer(ci.Name)
if o == nil {
t.Fatalf("Error looking up consumer %q", ci.Name)
}
o.setInActiveDeleteThreshold(500 * time.Millisecond)

// Make sure the consumer goes away though eventually.
// Should be 5 seconds wait.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
if _, err := js.ConsumerInfo("TEST", ci.Name); err != nil {
return nil
}
return fmt.Errorf("Consumer still present")
})
}

func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down Expand Up @@ -3876,7 +3878,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {
}

checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
Expand All @@ -3901,7 +3903,7 @@ func TestJetStreamClusterRemovePeer(t *testing.T) {

// Now check consumer info as well.
checkFor(t, 30*time.Second, 100*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "cat")
ci, err := js.ConsumerInfo("TEST", "cat", nats.MaxWait(time.Second))
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
}
Expand Down Expand Up @@ -5108,6 +5110,17 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
js.Publish("TEST", []byte("Second"))
js.Publish("TEST", []byte("Third"))

checkFor(t, time.Second, 15*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "tr")
if err != nil {
return fmt.Errorf("Error getting consumer info: %v", err)
}
if ci.NumPending != 2 {
return fmt.Errorf("NumPending still not 1: %v", ci.NumPending)
}
return nil
})

// Ack across accounts.
m, err = nc.Request("$JS.API.CONSUMER.MSG.NEXT.TEST.tr", []byte("+NXT"), 2*time.Second)
if err != nil {
Expand Down Expand Up @@ -6903,7 +6916,9 @@ func TestJetStreamClusterMirrorAndSourceSubLeaks(t *testing.T) {
}

// Some subs take longer to settle out so we give ourselves a small buffer.
if deleteSubs := c.stableTotalSubs(); deleteSubs > startSubs+10 {
// There will be 1 sub for client on each server (such as _INBOX.IvVJ2DOXUotn4RUSZZCFvp.*)
// and 2 or 3 subs such as `_R_.xxxxx.>` on each server, so a total of 12 subs.
if deleteSubs := c.stableTotalSubs(); deleteSubs > startSubs+12 {
t.Fatalf("Expected subs to return to %d from a high of %d, but got %d", startSubs, afterSubs, deleteSubs)
}
}
Expand Down
23 changes: 5 additions & 18 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,10 +1208,6 @@ func TestJetStreamCreateConsumer(t *testing.T) {

// Non-Durables need to have subscription to delivery subject.
delivery := nats.NewInbox()
if _, err := mset.addConsumer(&ConsumerConfig{DeliverSubject: delivery}); err == nil {
t.Fatalf("Expected an error on unsubscribed delivery subject")
}

// Pull-based consumers are required to be durable since we do not know when they should
// be cleaned up.
if _, err := mset.addConsumer(&ConsumerConfig{AckPolicy: AckExplicit}); err == nil {
Expand Down Expand Up @@ -7599,27 +7595,18 @@ func TestJetStreamRequestAPI(t *testing.T) {
if err = json.Unmarshal(resp.Data, &ccResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkNatsError(t, ccResp.Error, JSConsumerCreateErrF)

// Now create subscription and make sure we get proper response.
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()

resp, err = nc.Request(fmt.Sprintf(JSApiConsumerCreateT, msetCfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ccResp.Error, ccResp.ConsumerInfo = nil, nil
if err = json.Unmarshal(resp.Data, &ccResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Ephemerals are now not rejected when there is no interest.
if ccResp.ConsumerInfo == nil || ccResp.Error != nil {
t.Fatalf("Got a bad response %+v", ccResp)
}
if time.Since(ccResp.Created) > time.Second {
t.Fatalf("Created time seems wrong: %v\n", ccResp.Created)
}

// Now create subscription and make sure we get proper response.
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()

checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
Expand Down

0 comments on commit 315e658

Please sign in to comment.