diff --git a/server/jetstream_test.go b/server/jetstream_test.go index be2509e406..611aeb59d8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20282,3 +20282,86 @@ func TestJetStreamMaxBytesIgnored(t *testing.T) { require_NoError(t, err) require_True(t, si.State.Bytes <= 10*1024*1024) } + +func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { + for _, st := range []StorageType{FileStorage, MemoryStorage} { + t.Run(st.String(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc0, js0 := jsClientConnect(t, c.randomServer()) + defer nc0.Close() + + nc1, js1 := jsClientConnect(t, c.randomServer()) + defer nc1.Close() + + cfg := StreamConfig{ + Name: "KV", + Subjects: []string{"kv.>"}, + Storage: st, + Replicas: 3, + } + + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Do manually for now. + m, err := nc0.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + si, err := js0.StreamInfo("KV") + if err != nil { + t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data)) + } + if si == nil || si.Config.Name != "KV" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + pub := func(js nats.JetStreamContext, subj, data, seq string) { + t.Helper() + m := nats.NewMsg(subj) + m.Data = []byte(data) + m.Header.Set(JSExpectedLastSubjSeq, seq) + js.PublishMsg(m) + } + + ready := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + <-ready + pub(js0, "kv.foo", "0-0", "0") + pub(js0, "kv.foo", "0-1", "1") + pub(js0, "kv.foo", "0-2", "2") + wg.Done() + }() + + go func() { + <-ready + pub(js1, "kv.foo", "1-0", "0") + pub(js1, "kv.foo", "1-1", "1") + pub(js1, "kv.foo", "1-2", "2") + wg.Done() + }() + + time.Sleep(50 * time.Millisecond) + close(ready) + wg.Wait() + + // Read the messages. + sub, err := js0.PullSubscribe(_EMPTY_, _EMPTY_, nats.BindStream("KV")) + require_NoError(t, err) + msgs, err := sub.Fetch(10) + require_NoError(t, err) + if len(msgs) != 3 { + t.Errorf("Expected 3 messages, got %d", len(msgs)) + } + for i, m := range msgs { + if m.Header.Get(JSExpectedLastSubjSeq) != fmt.Sprint(i) { + t.Errorf("Expected %d for last sequence, got %q", i, m.Header.Get(JSExpectedLastSubjSeq)) + } + } + }) + } +} diff --git a/server/stream.go b/server/stream.go index 81c13e6590..04b6bae072 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3838,7 +3838,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected last sequence per subject. // If we are clustered we have prechecked seq > 0. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && (!isClustered || seq == 0) { + if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { // TODO(dlc) - We could make a new store func that does this all in one. var smv StoreMsg var fseq uint64