Skip to content

Commit

Permalink
Merge pull request #720 from nats-io/hdr-changes
Browse files Browse the repository at this point in the history
Add nats.Header type based on http.Header
  • Loading branch information
wallyqs committed Apr 28, 2021
2 parents 109f3dd + 3f05b6a commit aa4ab64
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 54 deletions.
5 changes: 2 additions & 3 deletions js.go
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -283,7 +282,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
m.Header = http.Header{}
m.Header = Header{}
}
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
Expand Down Expand Up @@ -584,7 +583,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
m.Header = http.Header{}
m.Header = Header{}
}
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions jsm.go
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
)
Expand Down Expand Up @@ -709,7 +708,7 @@ type apiMsgGetRequest struct {
type RawStreamMsg struct {
Subject string
Sequence uint64
Header http.Header
Header Header
Data []byte
Time time.Time
}
Expand Down Expand Up @@ -765,7 +764,7 @@ func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, err

msg := resp.Message

var hdr http.Header
var hdr Header
if msg.Header != nil {
hdr, err = decodeHeadersMsg(msg.Header)
if err != nil {
Expand Down
53 changes: 47 additions & 6 deletions nats.go
Expand Up @@ -582,7 +582,7 @@ type Subscription struct {
type Msg struct {
Subject string
Reply string
Header http.Header
Header Header
Data []byte
Sub *Subscription
next *Msg
Expand All @@ -602,7 +602,7 @@ func (m *Msg) headerBytes() ([]byte, error) {
return nil, ErrBadHeaderMsg
}

err = m.Header.Write(&b)
err = http.Header(m.Header).Write(&b)
if err != nil {
return nil, ErrBadHeaderMsg
}
Expand Down Expand Up @@ -2605,7 +2605,7 @@ func (nc *Conn) processMsg(data []byte) {
copy(msgPayload, data)

// Check if we have headers encoded here.
var h http.Header
var h Header
var err error
var ctrl bool
var hasFC bool
Expand Down Expand Up @@ -3001,11 +3001,52 @@ func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, nil, data)
}

// Header represents the optional Header for a NATS message,
// based on the implementation of http.Header.
type Header map[string][]string

// Add adds the key, value pair to the header. It is case-sensitive
// and appends to any existing values associated with key.
func (h Header) Add(key, value string) {
h[key] = append(h[key], value)
}

// Set sets the header entries associated with key to the single
// element value. It is case-sensitive and replaces any existing
// values associated with key.
func (h Header) Set(key, value string) {
h[key] = []string{value}
}

// Get gets the first value associated with the given key.
// It is case-sensitive.
func (h Header) Get(key string) string {
if h == nil {
return _EMPTY_
}
if v := h[key]; v != nil {
return v[0]
}
return _EMPTY_
}

// Values returns all values associated with the given key.
// It is case-sensitive.
func (h Header) Values(key string) []string {
return h[key]
}

// Del deletes the values associated with a key.
// It is case-sensitive.
func (h Header) Del(key string) {
delete(h, key)
}

// NewMsg creates a message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
Header: make(http.Header),
Header: make(Header),
}
}

Expand All @@ -3024,7 +3065,7 @@ const (
)

// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
func decodeHeadersMsg(data []byte) (Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
l, err := tp.ReadLine()
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
Expand All @@ -3049,7 +3090,7 @@ func decodeHeadersMsg(data []byte) (http.Header, error) {
mh.Add(descrHdr, description)
}
}
return http.Header(mh), nil
return Header(mh), nil
}

// readMIMEHeader returns a MIMEHeader that preserves the
Expand Down
47 changes: 47 additions & 0 deletions nats_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"reflect"
Expand Down Expand Up @@ -2482,6 +2483,52 @@ func TestHeaderParser(t *testing.T) {
checkStatus("NATS/1.0 404 No Messages", 404, "No Messages")
}

func TestHeaderMultiLine(t *testing.T) {
m := NewMsg("foo")
m.Header = Header{
"CorrelationID": []string{"123"},
"Msg-ID": []string{"456"},
"X-NATS-Keys": []string{"A", "B", "C"},
"X-Test-Keys": []string{"D", "E", "F"},
}
// Users can opt-in to canonicalize like http.Header does
// by using http.Header#Set or http.Header#Add.
http.Header(m.Header).Set("accept-encoding", "json")
http.Header(m.Header).Add("AUTHORIZATION", "s3cr3t")

// Multi Value Header becomes represented as multi-lines in the wire
// since internally using same Write from http stdlib.
m.Header.Set("X-Test", "First")
m.Header.Add("X-Test", "Second")
m.Header.Add("X-Test", "Third")

b, err := m.headerBytes()
if err != nil {
t.Fatal(err)
}
result := string(b)

expectedHeader := `NATS/1.0
Accept-Encoding: json
Authorization: s3cr3t
CorrelationID: 123
Msg-ID: 456
X-NATS-Keys: A
X-NATS-Keys: B
X-NATS-Keys: C
X-Test: First
X-Test: Second
X-Test: Third
X-Test-Keys: D
X-Test-Keys: E
X-Test-Keys: F
`
if strings.Replace(expectedHeader, "\n", "\r\n", -1) != result {
t.Fatalf("Expected: %q, got: %q", expectedHeader, result)
}
}

func TestLameDuckMode(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down

0 comments on commit aa4ab64

Please sign in to comment.