Skip to content

Commit

Permalink
Fixes to RespondMsg usage when crossing accounts
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Jul 8, 2021
1 parent 28a0a1b commit 332e394
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 26 deletions.
8 changes: 6 additions & 2 deletions nats.go
Expand Up @@ -4199,11 +4199,15 @@ func (m *Msg) RespondMsg(msg *Msg) error {
Header: msg.Header,
Data: msg.Data,
}
// Discard the reply inbox unless it is set to be a different one.
// Discard the reply inbox unless it is set to be a different one,
// for example when using it like this:
//
// m.Data = []byte("response")
// m.Respond(m)
//
if msg.Reply != m.Reply {
resp.Reply = msg.Reply
}
fmt.Printf(">>>>>>>>>>>>>>>>>>>>>>> %+v\n", string(resp.Data))
// No need to check the connection here since the call to publish will do all the checking.
return nc.PublishMsg(resp)
}
Expand Down
79 changes: 55 additions & 24 deletions test/auth_test.go
Expand Up @@ -371,19 +371,18 @@ func TestCrossAccountRespond(t *testing.T) {
permissions = {
subscribe = ["foo", "bar", "quux", "_INBOX.>"]
publish = ["_INBOX.>", "_R_.>"]
allow_responses = {max: 10, ttl: "1000ms"}
}
}
]
exports [
{ service: "foo" }
{ service: "bar" }
{ service: "quux" }
# Multiple responses
{ service: "quux", response: "stream", threshold: "1s" }
]
},
B: {
users: [ { user: b, password: b }
]
users: [ { user: b, password: b } ]
imports [
{ service: { subject: "foo", account: A } }
{ service: { subject: "bar", account: A } }
Expand Down Expand Up @@ -482,35 +481,37 @@ func TestCrossAccountRespond(t *testing.T) {
if !ok {
t.Error("Missing header in response")
}

// Server injects this header.
_, ok = resp.Header["Nats-Request-Info"]
if !ok {
t.Error("Missing header in response")
}
})

t.Run("multiple RespondMsg", func(t *testing.T) {
t.Run("multiple RespondMsg stream", func(t *testing.T) {
_, err := ncA.Subscribe("quux", func(m *nats.Msg) {
t.Logf("%+v", m)
m.Data = []byte("start")
m.Header["Task-Progress"] = []string{"0%"}
err = m.RespondMsg(m)
if err != nil {
errCh <- err
return
}
t.Logf("%+v", m)

m.Header["Task-Progress"] = []string{"50%"}
m.Data = []byte("wip")
err = m.RespondMsg(m)
if err != nil {
errCh <- err
return
}

t.Logf("%+v", m)

m.Header["Task-Progress"] = []string{"100%"}
m.Data = []byte("done")
err = m.RespondMsg(m)
if err != nil {
errCh <- err
return
}
t.Logf("%+v", m)
})
if err != nil {
t.Fatal(err)
Expand All @@ -524,37 +525,67 @@ func TestCrossAccountRespond(t *testing.T) {
return
}
ncB.Flush()
ncB.PublishRequest("quux", inbox, []byte("start"))

resp, err := responses.NextMsg(2 * time.Second)
err = ncB.PublishRequest("quux", inbox, []byte("start"))
if err != nil {
t.Fatal(err)
t.Error(err)
}

getNext := func(t *testing.T, sub *nats.Subscription) *nats.Msg {
resp, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Fatal(err)
}
if len(resp.Header) != 2 {
t.Errorf("Expected original headers as well, got: %d", len(resp.Header))
}
return resp
}

resp := getNext(t, responses)
got = string(resp.Data)
expect = "start"
if got != expect {
t.Errorf("Expected %v, got: %v", expect, got)
}

resp, err = responses.NextMsg(2 * time.Second)
if err != nil {
t.Fatal(err)
v, ok := resp.Header["Task-Progress"]
if !ok {
t.Error("Missing header in response")
}
got = v[0]
if got != "0%" {
t.Errorf("Unexpected value in header, got: %v", got)
}

// Following times out?
resp = getNext(t, responses)
got = string(resp.Data)
expect = "wip"
if got != expect {
t.Errorf("Expected %v, got: %v", expect, got)
}

resp, err = responses.NextMsg(2 * time.Second)
if err != nil {
t.Fatal(err)
v, ok = resp.Header["Task-Progress"]
if !ok {
t.Error("Missing header in response")
}
got = v[0]
if got != "50%" {
t.Errorf("Unexpected value in header, got: %v", got)
}

resp = getNext(t, responses)
got = string(resp.Data)
expect = "done"
if got != expect {
t.Errorf("Expected %v, got: %v", expect, got)
}
v, ok = resp.Header["Task-Progress"]
if !ok {
t.Error("Missing header in response")
}
got = v[0]
if got != "100%" {
t.Errorf("Unexpected value in header, got: %v", got)
}
})

select {
Expand Down

0 comments on commit 332e394

Please sign in to comment.