Skip to content

Commit

Permalink
Added generic header support
Browse files Browse the repository at this point in the history
This is meant to mirror HTTP headers as much as possible for interop not only between our clients but HTTP clients in general. I actually used the http.Header directly, so we get canonical names, etc.

I thought about adding in a way to add the data field to a message such that you could detect mime-type and content-length etc. But Go's builtin type detection is not that good, so stuck with basics.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 27, 2020
1 parent 076024f commit 965d31d
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 3 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module github.com/nats-io/nats.go

go 1.14

require (
github.com/golang/protobuf v1.4.0
github.com/nats-io/jwt v0.3.2
github.com/nats-io/nats-server/v2 v2.1.6
github.com/nats-io/nkeys v0.1.4
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.21.0
)
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA=
github.com/nats-io/nats-server/v2 v2.1.6 h1:qAaHZaS8pRRNQLFaiBA1rq5WynyEGp9DFgmMfoaiXGY=
github.com/nats-io/nats-server/v2 v2.1.6/go.mod h1:BL1NOtaBQ5/y97djERRVWNouMW7GT3gxnmbE/eC8u8A=
github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
Expand All @@ -12,4 +26,12 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
90 changes: 87 additions & 3 deletions nats.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -27,6 +27,8 @@ import (
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/textproto"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -116,6 +118,7 @@ var (
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
)

func init() {
Expand Down Expand Up @@ -463,6 +466,7 @@ type Subscription struct {
type Msg struct {
Subject string
Reply string
Header http.Header
Data []byte
Sub *Subscription
next *Msg
Expand Down Expand Up @@ -2184,8 +2188,30 @@ func (nc *Conn) processMsg(data []byte) {
msgPayload := make([]byte, len(data))
copy(msgPayload, data)

// Check if we have headers encoded here.
var h http.Header
var err error

if bytes.HasPrefix(msgPayload, []byte(hdrMagic)) {
h, msgPayload, err = decodeHeadersMsg(msgPayload)
if err != nil {
// FIXME(dlc) - Async error, add to dropped etc.
sub.mu.Lock()
sub.dropped++
sub.mu.Unlock()
nc.subsMu.RUnlock()
nc.mu.Lock()
nc.err = ErrBadHeaderMsg
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrBadHeaderMsg) })
}
nc.mu.Unlock()
return
}
}

// FIXME(dlc): Should we recycle these containers?
m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}

sub.mu.Lock()

Expand Down Expand Up @@ -2518,13 +2544,71 @@ func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, data)
}

// Used to create a new message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
Header: make(http.Header),
}
}

const hdrMagic = "⚡NATS⚡/0.1\r\n"
const crlf = "\r\n"
const hterm = "\r\n\r\n"

// decodeHeadersMsg will decode and headers and adjust the body of the message.
// This should be called only when the hdrMagic has been detected.
func decodeHeadersMsg(data []byte) (http.Header, []byte, error) {
// FIXME(dlc) - brittle and slow probably.
data = data[len(hdrMagic):]
var hendi int
if hendi = bytes.Index(data, []byte(hterm)); hendi < 0 {
return nil, nil, ErrBadHeaderMsg
}
br := bufio.NewReader(bytes.NewBuffer(data))
tp := textproto.NewReader(br)
mh, err := tp.ReadMIMEHeader()
if err != nil {
return nil, nil, ErrBadHeaderMsg
}
data = data[hendi+len(hterm):]
return http.Header(mh), data, nil
}

// encodeHeadersMsg will write our header and all the headers out in
// a canonical HTTP way and append the original data payload.
// TODO(dlc) - optimize
func encodeHeadersMsg(m *Msg) ([]byte, error) {
if m.Header == nil {
return m.Data, nil
}
var b bytes.Buffer
b.WriteString(hdrMagic)
m.Header.Write(&b)
b.WriteString(crlf)
b.Write(m.Data)
return b.Bytes(), nil
}

// PublishMsg publishes the Msg structure, which includes the
// Subject, an optional Reply and an optional Data field.
func (nc *Conn) PublishMsg(m *Msg) error {
if m == nil {
return ErrInvalidMsg
}
return nc.publish(m.Subject, m.Reply, m.Data)

var data []byte
var err error

if m.Header != nil {
data, err = encodeHeadersMsg(m)
if err != nil {
return err
}
} else {
data = m.Data
}
return nc.publish(m.Subject, m.Reply, data)
}

// PublishRequest will perform a Publish() excpecting a response on the
Expand Down
93 changes: 93 additions & 0 deletions test/headers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package test

import (
"reflect"
"testing"
"time"

"github.com/nats-io/nats.go"
)

func TestBasicHeaders(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

subject := "headers.test"
sub, err := nc.SubscribeSync(subject)
if err != nil {
t.Fatalf("Could not subscribe to %q: %v", subject, err)
}
defer sub.Unsubscribe()

m := nats.NewMsg(subject)
m.Header.Add("Accept-Encoding", "json")
m.Header.Add("Authorization", "s3cr3t")
m.Data = []byte("Hello Headers!")

nc.PublishMsg(m)
msg, _ := sub.NextMsg(time.Second)

// Blank out the sub since its not present in the original.
msg.Sub = nil
if !reflect.DeepEqual(m, msg) {
t.Fatalf("Messages did not match! \n%+v\n%+v\n", m, msg)
}
}

// Make sure we do the right thing with a badly encoded header msg.
func TestBadHeaderMsg(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

ech := make(chan error)

nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
ech <- err
}))
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

subject := "headers.test"
sub, err := nc.SubscribeSync(subject)
if err != nil {
t.Fatalf("Could not subscribe to %q: %v", subject, err)
}
defer sub.Unsubscribe()

bad := []byte("⚡NATS⚡/0.1\r\nAccept-Encoding: json\r\nAutho Bad Headers!")

nc.Publish(subject, bad)

tm := time.NewTimer(5 * time.Second)
defer tm.Stop()

select {
case err := <-ech:
if err != nats.ErrBadHeaderMsg {
t.Fatalf("Did not get the error we wanted, got %v", err)
}
case <-tm.C:
t.Fatalf("Did not receive an error")
}
}

0 comments on commit 965d31d

Please sign in to comment.