From e7bf1b3132ba05bd004261046bcfdad3cee41e0d Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Tue, 18 Jul 2023 11:10:51 -0400 Subject: [PATCH 1/3] Add test case for concurrent expected last subject sequence Signed-off-by: Byron Ruth --- server/jetstream_test.go | 87 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index be2509e406..97d04537c5 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11461,6 +11461,93 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) { } } +func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { + for _, st := range []StorageType{FileStorage, MemoryStorage} { + t.Run(st.String(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc0, js0 := jsClientConnect(t, c.randomServer()) + defer nc0.Close() + + nc1, js1 := jsClientConnect(t, c.randomServer()) + defer nc1.Close() + + cfg := StreamConfig{ + Name: "KV", + Subjects: []string{"kv.>"}, + Storage: st, + Replicas: 3, + } + + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Do manually for now. + m, err := nc0.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + si, err := js0.StreamInfo("KV") + if err != nil { + t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data)) + } + if si == nil || si.Config.Name != "KV" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + pub := func(js nats.JetStreamContext, subj, data, seq string) { + t.Helper() + m := nats.NewMsg(subj) + m.Data = []byte(data) + m.Header.Set(JSExpectedLastSubjSeq, seq) + js.PublishMsg(m) + } + + ready := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + <-ready + pub(js0, "kv.foo", "0-0", "0") + pub(js0, "kv.foo", "0-1", "1") + pub(js0, "kv.foo", "0-2", "2") + wg.Done() + }() + + go func() { + <-ready + pub(js1, "kv.foo", "1-0", "0") + pub(js1, "kv.foo", "1-1", "1") + pub(js1, "kv.foo", "1-2", "2") + wg.Done() + }() + + time.Sleep(10 * time.Millisecond) + close(ready) + wg.Wait() + + // Read the messages. + sub, err := js0.PullSubscribe("", "", nats.BindStream("KV")) + require_NoError(t, err) + msgs, err := sub.Fetch(10) + require_NoError(t, err) + if len(msgs) != 3 { + t.Errorf("Expected 3 messages, got %d", len(msgs)) + } + for i, m := range msgs { + md, _ := m.Metadata() + t.Logf("Seq: %d", md.Sequence.Stream) + t.Logf("Data: %s", m.Data) + t.Logf("Header: %v", m.Header) + if m.Header.Get(JSExpectedLastSubjSeq) != fmt.Sprint(i) { + t.Errorf("Expected %d for last sequence, got %q", i, m.Header.Get(JSExpectedLastSubjSeq)) + } + } + }) + } +} + func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() From 360f807ceff0c257d2bc43e1757f65e55b917fa4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 18 Jul 2023 11:29:26 -0700 Subject: [PATCH 2/3] Moved to end for merge with other branches, minor changes Signed-off-by: Derek Collison --- server/jetstream_test.go | 170 +++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 87 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 97d04537c5..611aeb59d8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11461,93 +11461,6 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) { } } -func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { - for _, st := range []StorageType{FileStorage, MemoryStorage} { - t.Run(st.String(), func(t *testing.T) { - c := createJetStreamClusterExplicit(t, "JSC", 3) - defer c.shutdown() - - nc0, js0 := jsClientConnect(t, c.randomServer()) - defer nc0.Close() - - nc1, js1 := jsClientConnect(t, c.randomServer()) - defer nc1.Close() - - cfg := StreamConfig{ - Name: "KV", - Subjects: []string{"kv.>"}, - Storage: st, - Replicas: 3, - } - - req, err := json.Marshal(cfg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - // Do manually for now. - m, err := nc0.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - require_NoError(t, err) - si, err := js0.StreamInfo("KV") - if err != nil { - t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data)) - } - if si == nil || si.Config.Name != "KV" { - t.Fatalf("StreamInfo is not correct %+v", si) - } - - pub := func(js nats.JetStreamContext, subj, data, seq string) { - t.Helper() - m := nats.NewMsg(subj) - m.Data = []byte(data) - m.Header.Set(JSExpectedLastSubjSeq, seq) - js.PublishMsg(m) - } - - ready := make(chan struct{}) - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - <-ready - pub(js0, "kv.foo", "0-0", "0") - pub(js0, "kv.foo", "0-1", "1") - pub(js0, "kv.foo", "0-2", "2") - wg.Done() - }() - - go func() { - <-ready - pub(js1, "kv.foo", "1-0", "0") - pub(js1, "kv.foo", "1-1", "1") - pub(js1, "kv.foo", "1-2", "2") - wg.Done() - }() - - time.Sleep(10 * time.Millisecond) - close(ready) - wg.Wait() - - // Read the messages. - sub, err := js0.PullSubscribe("", "", nats.BindStream("KV")) - require_NoError(t, err) - msgs, err := sub.Fetch(10) - require_NoError(t, err) - if len(msgs) != 3 { - t.Errorf("Expected 3 messages, got %d", len(msgs)) - } - for i, m := range msgs { - md, _ := m.Metadata() - t.Logf("Seq: %d", md.Sequence.Stream) - t.Logf("Data: %s", m.Data) - t.Logf("Header: %v", m.Header) - if m.Header.Get(JSExpectedLastSubjSeq) != fmt.Sprint(i) { - t.Errorf("Expected %d for last sequence, got %q", i, m.Header.Get(JSExpectedLastSubjSeq)) - } - } - }) - } -} - func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -20369,3 +20282,86 @@ func TestJetStreamMaxBytesIgnored(t *testing.T) { require_NoError(t, err) require_True(t, si.State.Bytes <= 10*1024*1024) } + +func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { + for _, st := range []StorageType{FileStorage, MemoryStorage} { + t.Run(st.String(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc0, js0 := jsClientConnect(t, c.randomServer()) + defer nc0.Close() + + nc1, js1 := jsClientConnect(t, c.randomServer()) + defer nc1.Close() + + cfg := StreamConfig{ + Name: "KV", + Subjects: []string{"kv.>"}, + Storage: st, + Replicas: 3, + } + + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Do manually for now. + m, err := nc0.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + si, err := js0.StreamInfo("KV") + if err != nil { + t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data)) + } + if si == nil || si.Config.Name != "KV" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + pub := func(js nats.JetStreamContext, subj, data, seq string) { + t.Helper() + m := nats.NewMsg(subj) + m.Data = []byte(data) + m.Header.Set(JSExpectedLastSubjSeq, seq) + js.PublishMsg(m) + } + + ready := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + <-ready + pub(js0, "kv.foo", "0-0", "0") + pub(js0, "kv.foo", "0-1", "1") + pub(js0, "kv.foo", "0-2", "2") + wg.Done() + }() + + go func() { + <-ready + pub(js1, "kv.foo", "1-0", "0") + pub(js1, "kv.foo", "1-1", "1") + pub(js1, "kv.foo", "1-2", "2") + wg.Done() + }() + + time.Sleep(50 * time.Millisecond) + close(ready) + wg.Wait() + + // Read the messages. + sub, err := js0.PullSubscribe(_EMPTY_, _EMPTY_, nats.BindStream("KV")) + require_NoError(t, err) + msgs, err := sub.Fetch(10) + require_NoError(t, err) + if len(msgs) != 3 { + t.Errorf("Expected 3 messages, got %d", len(msgs)) + } + for i, m := range msgs { + if m.Header.Get(JSExpectedLastSubjSeq) != fmt.Sprint(i) { + t.Errorf("Expected %d for last sequence, got %q", i, m.Header.Get(JSExpectedLastSubjSeq)) + } + } + }) + } +} From 244dda809cadf1deff33d08821696ba42b85cba5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 18 Jul 2023 11:29:59 -0700 Subject: [PATCH 3/3] Fix bug that would race around check for last sequence per subject Signed-off-by: Derek Collison --- server/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/stream.go b/server/stream.go index 81c13e6590..04b6bae072 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3838,7 +3838,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected last sequence per subject. // If we are clustered we have prechecked seq > 0. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && (!isClustered || seq == 0) { + if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { // TODO(dlc) - We could make a new store func that does this all in one. var smv StoreMsg var fseq uint64