diff --git a/server/client.go b/server/client.go index 8303991e425..4ffc00d989b 100644 --- a/server/client.go +++ b/server/client.go @@ -92,6 +92,7 @@ const ( msgScratchSize = 1024 msgHeadProto = "RMSG " msgHeadProtoLen = len(msgHeadProto) + msgHeadHdrProto = "HMSG " // For controlling dynamic buffer sizes. startBufSize = 512 // For INFO/CONNECT block diff --git a/server/gateway.go b/server/gateway.go index d34e666e505..d0a13532747 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2718,11 +2718,11 @@ func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte { // This should be invoked only from processInboundGatewayMsg() or // processInboundRoutedMsg() and is checking if the subject -// (c.pa.subject) has the $GNR prefix. If so, this is processed +// (c.pa.subject) has the _GR_ prefix. If so, this is processed // as a GW reply and `true` is returned to indicate to the caller // that it should stop processing. // If gateway is not enabled on this server or if the subject -// does not start with $GR, `false` is returned and caller should +// does not start with _GR_, `false` is returned and caller should // process message as usual. func (c *client) handleGatewayReply(msg []byte) (processed bool) { // Do not handle GW prefixed messages if this server does not have @@ -2821,7 +2821,12 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // to the server where the request originated. var bufa [256]byte var buf = bufa[:0] - buf = append(buf, msgHeadProto...) + hasHdr := c.pa.hdr >= 0 + if hasHdr { + buf = append(buf, msgHeadHdrProto...) + } else { + buf = append(buf, msgHeadProto...) + } buf = append(buf, acc.Name...) buf = append(buf, ' ') buf = append(buf, orgSubject...) @@ -2830,11 +2835,14 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { buf = append(buf, c.pa.reply...) buf = append(buf, ' ') } + if hasHdr { + buf = append(buf, c.pa.hdb...) + buf = append(buf, ' ') + } buf = append(buf, c.pa.szb...) mhEnd := len(buf) buf = append(buf, _CRLF_...) buf = append(buf, msg...) - route.mu.Lock() route.enqueueProto(buf) if route.trace { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index d98005657b4..22ffe9b8381 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6525,6 +6525,64 @@ func TestJetStreamClusterSuperClusterGetNextRewrite(t *testing.T) { } } +func TestJetStreamClusterSuperClusterPullConsumerAndHeaders(t *testing.T) { + sc := createJetStreamSuperCluster(t, 3, 2) + defer sc.shutdown() + + c1 := sc.clusterForName("C1") + c2 := sc.clusterForName("C2") + + nc, js := jsClientConnect(t, c1.randomServer()) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "ORIGIN"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + toSend := 50 + for i := 0; i < toSend; i++ { + if _, err := js.Publish("ORIGIN", []byte("ok")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + nc2, js2 := jsClientConnect(t, c2.randomServer()) + defer nc2.Close() + + _, err := js2.AddStream(&nats.StreamConfig{ + Name: "S", + Sources: []*nats.StreamSource{{Name: "ORIGIN"}}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Wait for them to be in the sourced stream. + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if si, _ := js2.StreamInfo("S"); si.State.Msgs != uint64(toSend) { + return fmt.Errorf("Expected %d msgs for %q, got %d", toSend, "S", si.State.Msgs) + } + return nil + }) + + // Now create a pull consumers for the sourced stream. + _, err = js2.AddConsumer("S", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Now we will connect and request the next message from each server in C1 cluster and check that headers remain in place. + for _, s := range c1.servers { + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + m, err := nc.Request("$JS.API.CONSUMER.MSG.NEXT.S.dlc", nil, 2*time.Second) + if len(m.Header) != 1 { + fmt.Printf("m is %+v\n", m) + t.Fatalf("Expected 1 header element, got %+v", m.Header) + } + } +} + func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) { c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false) defer c.shutdown()