Skip to content

Commit

Permalink
When receiving a response across a gateway that has headers and a glo…
Browse files Browse the repository at this point in the history
…bally routed subject (_GR_) we were dropping header information.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 10, 2021
1 parent 00bfc1f commit f633033
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
1 change: 1 addition & 0 deletions server/client.go
Expand Up @@ -92,6 +92,7 @@ const (
msgScratchSize = 1024
msgHeadProto = "RMSG "
msgHeadProtoLen = len(msgHeadProto)
msgHeadHdrProto = "HMSG "

// For controlling dynamic buffer sizes.
startBufSize = 512 // For INFO/CONNECT block
Expand Down
16 changes: 12 additions & 4 deletions server/gateway.go
Expand Up @@ -2718,11 +2718,11 @@ func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte {

// This should be invoked only from processInboundGatewayMsg() or
// processInboundRoutedMsg() and is checking if the subject
// (c.pa.subject) has the $GNR prefix. If so, this is processed
// (c.pa.subject) has the _GR_ prefix. If so, this is processed
// as a GW reply and `true` is returned to indicate to the caller
// that it should stop processing.
// If gateway is not enabled on this server or if the subject
// does not start with $GR, `false` is returned and caller should
// does not start with _GR_, `false` is returned and caller should
// process message as usual.
func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// Do not handle GW prefixed messages if this server does not have
Expand Down Expand Up @@ -2821,7 +2821,12 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// to the server where the request originated.
var bufa [256]byte
var buf = bufa[:0]
buf = append(buf, msgHeadProto...)
hasHdr := c.pa.hdr >= 0
if hasHdr {
buf = append(buf, msgHeadHdrProto...)
} else {
buf = append(buf, msgHeadProto...)
}
buf = append(buf, acc.Name...)
buf = append(buf, ' ')
buf = append(buf, orgSubject...)
Expand All @@ -2830,11 +2835,14 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
buf = append(buf, c.pa.reply...)
buf = append(buf, ' ')
}
if hasHdr {
buf = append(buf, c.pa.hdb...)
buf = append(buf, ' ')
}
buf = append(buf, c.pa.szb...)
mhEnd := len(buf)
buf = append(buf, _CRLF_...)
buf = append(buf, msg...)

route.mu.Lock()
route.enqueueProto(buf)
if route.trace {
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_cluster_test.go
Expand Up @@ -6525,6 +6525,64 @@ func TestJetStreamClusterSuperClusterGetNextRewrite(t *testing.T) {
}
}

func TestJetStreamClusterSuperClusterPullConsumerAndHeaders(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()

c1 := sc.clusterForName("C1")
c2 := sc.clusterForName("C2")

nc, js := jsClientConnect(t, c1.randomServer())
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{Name: "ORIGIN"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 50
for i := 0; i < toSend; i++ {
if _, err := js.Publish("ORIGIN", []byte("ok")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

nc2, js2 := jsClientConnect(t, c2.randomServer())
defer nc2.Close()

_, err := js2.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "ORIGIN"}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Wait for them to be in the sourced stream.
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if si, _ := js2.StreamInfo("S"); si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Expected %d msgs for %q, got %d", toSend, "S", si.State.Msgs)
}
return nil
})

// Now create a pull consumers for the sourced stream.
_, err = js2.AddConsumer("S", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Now we will connect and request the next message from each server in C1 cluster and check that headers remain in place.
for _, s := range c1.servers {
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
m, err := nc.Request("$JS.API.CONSUMER.MSG.NEXT.S.dlc", nil, 2*time.Second)
if len(m.Header) != 1 {
fmt.Printf("m is %+v\n", m)
t.Fatalf("Expected 1 header element, got %+v", m.Header)
}
}
}

func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false)
defer c.shutdown()
Expand Down

0 comments on commit f633033

Please sign in to comment.