Skip to content

Commit

Permalink
Merge f684421 into 08c04fd
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Sep 6, 2022
2 parents 08c04fd + f684421 commit 13fdfb1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
3 changes: 0 additions & 3 deletions js_test.go
Expand Up @@ -1063,9 +1063,6 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
msg.Header.Set("some", "header")
check("missing stream")

msg.Header.Set(JSStream, "other")
check("stream header is 'other', not 'test'")

msg.Header.Set(JSStream, "test")
check("missing sequence")

Expand Down
9 changes: 5 additions & 4 deletions jsm.go
Expand Up @@ -912,7 +912,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt

var apiSubj string

doDirectGetLastBySubject := o.directGet && mreq.LastFor != ""
doDirectGetLastBySubject := o.directGet && mreq.LastFor != _EMPTY_

if doDirectGetLastBySubject {
apiSubj = apiDirectMsgGetLastBySubjectT
Expand Down Expand Up @@ -1006,9 +1006,10 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
if stream == _EMPTY_ {
return nil, fmt.Errorf("nats: missing stream header")
}
if stream != name {
return nil, fmt.Errorf("nats: response stream header is '%s', not '%s'", stream, name)
}

// Mirrors can now answer direct gets, so removing check for name equality.
// TODO(dlc) - We could have server also have a header with origin and check that?

seqStr := r.Header.Get(JSSequence)
if seqStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing sequence header")
Expand Down
35 changes: 35 additions & 0 deletions kv_test.go
Expand Up @@ -168,3 +168,38 @@ func TestKeyValueRePublish(t *testing.T) {
t.Fatalf("Expected subject header %q, got %q", expected, v)
}
}

func TestKeyValueMirrorDirectGet(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "TEST"})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}
_, err = js.AddStream(&StreamConfig{
Name: "MIRROR",
Mirror: &StreamSource{Name: "KV_TEST"},
MirrorDirect: true,
})
if err != nil {
t.Fatalf("Error creating mirror: %v", err)
}

for i := 0; i < 100; i++ {
key := fmt.Sprintf("KEY.%d", i)
if _, err := kv.PutString(key, "42"); err != nil {
t.Fatalf("Error adding key: %v", err)
}
}

// Make sure all gets work.
for i := 0; i < 100; i++ {
if _, err := kv.Get("KEY.22"); err != nil {
t.Fatalf("Got error getting key: %v", err)
}
}
}

0 comments on commit 13fdfb1

Please sign in to comment.