Skip to content

Commit

Permalink
Merge pull request #2245 from nats-io/msgid
Browse files Browse the repository at this point in the history
[FIXED] #2242.
  • Loading branch information
derekcollison committed May 24, 2021
2 parents d4a0b87 + 8f2457f commit 11539ec
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
4 changes: 4 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,10 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error {

s, cc := js.srv, js.cluster

if cc == nil || cc.meta == nil {
return ErrJetStreamNotClustered
}

// If this is a single peer raft group or we are not a member return.
if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) {
// Nothing to do here.
Expand Down
39 changes: 39 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6863,6 +6863,45 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) {
})
}

// https://github.com/nats-io/nats-server/issues/2242
func TestJetStreamClusterMsgIdDuplicateBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSL", 3)
defer c.shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sendMsgID := func(id string) (*nats.PubAck, error) {
t.Helper()
m := nats.NewMsg("foo")
m.Header.Add(JSMsgId, id)
m.Data = []byte("HELLO WORLD")
return js.PublishMsg(m)
}

if _, err := sendMsgID("1"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// This should fail with duplicate detected.
if pa, _ := sendMsgID("1"); pa == nil || !pa.Duplicate {
t.Fatalf("Expected duplicate but got none: %+v", pa)
}
// This should be fine.
if _, err := sendMsgID("2"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

func TestJetStreamClusterNilMsgWithHeaderThroughSourcedStream(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 12232, true)
Expand Down
11 changes: 8 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2480,7 +2480,10 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subje
}
}

var errLastSeqMismatch = errors.New("last sequence mismatch")
var (
errLastSeqMismatch = errors.New("last sequence mismatch")
errMsgIdDuplicate = errors.New("msgid is duplicate")
)

// processJetStreamMsg is where we try to actually process the stream msg.
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
Expand Down Expand Up @@ -2555,7 +2558,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
response = append(response, ",\"duplicate\": true}"...)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return errors.New("msgid is duplicate")
return errMsgIdDuplicate
}

// Expected stream.
Expand Down Expand Up @@ -2681,6 +2684,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Assume this will succeed.
olmsgId := mset.lmsgId
mset.lmsgId = msgId
clfs := mset.clfs
mset.lseq++

// We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions.
Expand All @@ -2692,7 +2696,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
} else {
seq = lseq + 1
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}

Expand Down

0 comments on commit 11539ec

Please sign in to comment.