Skip to content

Commit

Permalink
Merge f15c190 into d638893
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed May 26, 2020
2 parents d638893 + f15c190 commit e33b365
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 42 deletions.
4 changes: 2 additions & 2 deletions enc.go
Expand Up @@ -93,7 +93,7 @@ func (c *EncodedConn) Publish(subject string, v interface{}) error {
if err != nil {
return err
}
return c.Conn.publish(subject, _EMPTY_, b)
return c.Conn.publish(subject, _EMPTY_, nil, b)
}

// PublishRequest will perform a Publish() expecting a response on the
Expand All @@ -104,7 +104,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
if err != nil {
return err
}
return c.Conn.publish(subject, reply, b)
return c.Conn.publish(subject, reply, nil, b)
}

// Request will create an Inbox and perform a Request() call
Expand Down
5 changes: 4 additions & 1 deletion go.mod
@@ -1,7 +1,10 @@
module github.com/nats-io/nats.go

require (
github.com/nats-io/jwt v0.3.2
github.com/golang/protobuf v1.4.2
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093
github.com/nats-io/nkeys v0.1.4
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)
30 changes: 30 additions & 0 deletions go.sum
@@ -1,5 +1,23 @@
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093 h1:ii4KAXLYB3f7A6VnBhRWsYP+x45C+GAXh5T2VQWLfgQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
Expand All @@ -10,6 +28,18 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
174 changes: 138 additions & 36 deletions nats.go
Expand Up @@ -27,6 +27,8 @@ import (
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/textproto"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -120,6 +122,8 @@ var (
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
)

func init() {
Expand Down Expand Up @@ -494,6 +498,7 @@ type Subscription struct {
type Msg struct {
Subject string
Reply string
Header http.Header
Data []byte
Sub *Subscription
next *Msg
Expand Down Expand Up @@ -525,13 +530,15 @@ type srv struct {
tlsName string
}

// The INFO block received from the server.
type serverInfo struct {
ID string `json:"server_id"`
Host string `json:"host"`
Port uint `json:"port"`
Version string `json:"version"`
AuthRequired bool `json:"auth_required"`
TLSRequired bool `json:"tls_required"`
Headers bool `json:"headers"`
MaxPayload int64 `json:"max_payload"`
ConnectURLs []string `json:"connect_urls,omitempty"`
Proto int `json:"proto,omitempty"`
Expand Down Expand Up @@ -564,6 +571,7 @@ type connectInfo struct {
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
}

// MsgHandler is a callback function that processes messages delivered to
Expand Down Expand Up @@ -1077,10 +1085,11 @@ func (o Options) Connect() (*Conn, error) {
}

const (
_CRLF_ = "\r\n"
_EMPTY_ = ""
_SPC_ = " "
_PUB_P_ = "PUB "
_CRLF_ = "\r\n"
_EMPTY_ = ""
_SPC_ = " "
_PUB_P_ = "PUB "
_HPUB_P_ = "HPUB "
)

const (
Expand All @@ -1091,12 +1100,12 @@ const (
)

const (
conProto = "CONNECT %s" + _CRLF_
pingProto = "PING" + _CRLF_
pongProto = "PONG" + _CRLF_
subProto = "SUB %s %s %d" + _CRLF_
unsubProto = "UNSUB %d %s" + _CRLF_
okProto = _OK_OP_ + _CRLF_
connectProto = "CONNECT %s" + _CRLF_
pingProto = "PING" + _CRLF_
pongProto = "PONG" + _CRLF_
subProto = "SUB %s %s %d" + _CRLF_
unsubProto = "UNSUB %d %s" + _CRLF_
okProto = _OK_OP_ + _CRLF_
)

// Return the currently selected server
Expand Down Expand Up @@ -1444,9 +1453,9 @@ func (nc *Conn) setup() {
nc.fch = make(chan struct{}, flushChanSize)
nc.rqch = make(chan struct{})

// Setup scratch outbound buffer for PUB
pub := nc.scratch[:len(_PUB_P_)]
copy(pub, _PUB_P_)
// Setup scratch outbound buffer for PUB/HPUB
pub := nc.scratch[:len(_HPUB_P_)]
copy(pub, _HPUB_P_)
}

// Process a connected connection and initialize properly.
Expand Down Expand Up @@ -1660,7 +1669,7 @@ func (nc *Conn) connectProto() (string, error) {
}

cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho}
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true}

b, err := json.Marshal(cinfo)
if err != nil {
Expand All @@ -1672,7 +1681,7 @@ func (nc *Conn) connectProto() (string, error) {
return _EMPTY_, ErrNoEchoNotSupported
}

return fmt.Sprintf(conProto, b), nil
return fmt.Sprintf(connectProto, b), nil
}

// normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
Expand Down Expand Up @@ -2264,8 +2273,27 @@ func (nc *Conn) processMsg(data []byte) {
msgPayload := make([]byte, len(data))
copy(msgPayload, data)

// Check if we have headers encoded here.
var h http.Header
var err error

if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
msgPayload = msgPayload[nc.ps.ma.hdr:]
h, err = decodeHeadersMsg(hbuf)
if err != nil {
// We will pass the message through but send async error.
nc.mu.Lock()
nc.err = ErrBadHeaderMsg
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrBadHeaderMsg) })
}
nc.mu.Unlock()
}
}

// FIXME(dlc): Should we recycle these containers?
m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}

sub.mu.Lock()

Expand Down Expand Up @@ -2601,7 +2629,41 @@ func (nc *Conn) kickFlusher() {
// argument is left untouched and needs to be correctly interpreted on
// the receiver.
func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, data)
return nc.publish(subj, _EMPTY_, nil, data)
}

// Used to create a new message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
Header: make(http.Header),
}
}

const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
)

// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
// FIXME(dlc) - brittle and slow probably.
br := bufio.NewReader(bytes.NewBuffer(data))
hl, err := br.ReadString('\n')
if err != nil {
return nil, err
}
// FIXME(dlc) - pull status and description?
if !strings.HasPrefix(hl, hdrLine[:hdrPreEnd]) {
return nil, ErrBadHeaderMsg
}
tp := textproto.NewReader(br)
mh, err := tp.ReadMIMEHeader()
if err != nil {
return nil, ErrBadHeaderMsg
}
return http.Header(mh), nil
}

// PublishMsg publishes the Msg structure, which includes the
Expand All @@ -2610,14 +2672,26 @@ func (nc *Conn) PublishMsg(m *Msg) error {
if m == nil {
return ErrInvalidMsg
}
return nc.publish(m.Subject, m.Reply, m.Data)
var hdr []byte
if len(m.Header) > 0 {
if !nc.info.Headers {
return ErrHeadersNotSupported
}
// FIXME(dlc) - Optimize
var b bytes.Buffer
b.WriteString(hdrLine)
m.Header.Write(&b)
b.WriteString(crlf)
hdr = b.Bytes()
}
return nc.publish(m.Subject, m.Reply, hdr, m.Data)
}

// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
return nc.publish(subj, reply, data)
return nc.publish(subj, reply, nil, data)
}

// Used for handrolled itoa
Expand All @@ -2626,7 +2700,7 @@ const digits = "0123456789"
// publish is the internal function to publish messages to a nats-server.
// Sends a protocol data message by queuing into the bufio writer
// and kicking the flush go routine. These writes should be protected.
func (nc *Conn) publish(subj, reply string, data []byte) error {
func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
if nc == nil {
return ErrInvalidConnection
}
Expand All @@ -2646,7 +2720,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
}

// Proactively reject payloads over the threshold set by server.
msgSize := int64(len(data))
msgSize := int64(len(data) + len(hdr))
if msgSize > nc.info.MaxPayload {
nc.mu.Unlock()
return ErrMaxPayload
Expand All @@ -2664,37 +2738,65 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
}
}

msgh := nc.scratch[:len(_PUB_P_)]
msgh = append(msgh, subj...)
msgh = append(msgh, ' ')
var mh []byte
if hdr != nil {
mh = nc.scratch[:len(_HPUB_P_)]
} else {
mh = nc.scratch[1:len(_HPUB_P_)]
}
mh = append(mh, subj...)
mh = append(mh, ' ')
if reply != "" {
msgh = append(msgh, reply...)
msgh = append(msgh, ' ')
mh = append(mh, reply...)
mh = append(mh, ' ')
}

// We could be smarter here, but simple loop is ok,
// just avoid strconv in fast path
// just avoid strconv in fast path.
// FIXME(dlc) - Find a better way here.
// msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
// go 1.14 some values strconv faster, may be able to switch over.

var b [12]byte
var i = len(b)
if len(data) > 0 {
for l := len(data); l > 0; l /= 10 {
i -= 1

if hdr != nil {
if len(hdr) > 0 {
for l := len(hdr); l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
} else {
i--
b[i] = digits[0]
}
mh = append(mh, b[i:]...)
mh = append(mh, ' ')
// reset for below.
i = len(b)
}

if msgSize > 0 {
for l := msgSize; l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
} else {
i -= 1
i--
b[i] = digits[0]
}

msgh = append(msgh, b[i:]...)
msgh = append(msgh, _CRLF_...)
mh = append(mh, b[i:]...)
mh = append(mh, _CRLF_...)

_, err := nc.bw.Write(msgh)
_, err := nc.bw.Write(mh)
if err == nil {
_, err = nc.bw.Write(data)
if hdr != nil {
_, err = nc.bw.Write(hdr)
}
if err == nil {
_, err = nc.bw.Write(data)
}
}
if err == nil {
_, err = nc.bw.WriteString(_CRLF_)
Expand All @@ -2705,7 +2807,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
}

nc.OutMsgs++
nc.OutBytes += uint64(len(data))
nc.OutBytes += uint64(len(data) + len(hdr))

if len(nc.fch) == 0 {
nc.kickFlusher()
Expand Down

0 comments on commit e33b365

Please sign in to comment.