Skip to content

Commit

Permalink
Merge 585eab7 into fc6fed8
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Jun 18, 2020
2 parents fc6fed8 + 585eab7 commit 55956f4
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 57 deletions.
51 changes: 27 additions & 24 deletions context.go
Expand Up @@ -58,34 +58,37 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [
return nil, ctx.Err()
}

nc.mu.Lock()
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequestWithContext(ctx, subj, hdr, data)
}

mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
}
var m *Msg
var err error

var ok bool
var msg *Msg
// If user wants the old style.
if nc.useOldRequestStyle() {
m, err = nc.oldRequestWithContext(ctx, subj, hdr, data)
} else {
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
}

select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
var ok bool

select {
case m, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
case <-ctx.Done():
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ctx.Err()
}
case <-ctx.Done():
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ctx.Err()
}

return msg, nil
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
m, err = nil, ErrNoResponders
}
return m, err
}

// oldRequestWithContext utilizes inbox and subscription per request.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Expand Up @@ -7,17 +7,23 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed h1:nnV8Mw23aNwNpKuQWuVBEuAqyBOEY21hLWKpVdNr6dQ=
github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3 h1:aDJ5IrBlq4KHBgwWZtKXi1lvY2EkRYOiC2KQfdLTJL8=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3/go.mod h1:uXGA6y1uxwW755SK+LoDZggh+UUVsbVoxh8ZG8MqbsI=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
Expand All @@ -35,6 +41,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
77 changes: 51 additions & 26 deletions nats.go
Expand Up @@ -123,6 +123,7 @@ var (
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
)

func init() {
Expand Down Expand Up @@ -589,21 +590,22 @@ const (
)

type connectInfo struct {
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
UserJWT string `json:"jwt,omitempty"`
Nkey string `json:"nkey,omitempty"`
Signature string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
Token string `json:"auth_token,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
UserJWT string `json:"jwt,omitempty"`
Nkey string `json:"nkey,omitempty"`
Signature string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
Token string `json:"auth_token,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
NoResponders bool `json:"no_responders"`
}

// MsgHandler is a callback function that processes messages delivered to
Expand Down Expand Up @@ -1711,8 +1713,10 @@ func (nc *Conn) connectProto() (string, error) {
token = nc.Opts.TokenHandler()
}

// If our server does not support headers then we can't do them or no responders.
hdrs := nc.info.Headers
cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true}
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}

b, err := json.Marshal(cinfo)
if err != nil {
Expand Down Expand Up @@ -2689,21 +2693,28 @@ func NewMsg(subject string) *Msg {
}

const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
noResponders = "503"
)

// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] {
l, err := tp.ReadLine()
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
return nil, ErrBadHeaderMsg
}
mh, err := tp.ReadMIMEHeader()
if err != nil {
return nil, ErrBadHeaderMsg
}
// Check if we have an inlined status.
if len(l) > hdrPreEnd {
mh.Add(statusHdr, strings.TrimLeft(l[hdrPreEnd:], " "))
}
return http.Header(mh), nil
}

Expand Down Expand Up @@ -2904,6 +2915,7 @@ func (nc *Conn) respHandler(m *Msg) {

// Helper to setup and send new request style requests. Return the chan to receive the response.
func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
nc.mu.Lock()
// Do setup for the new style if needed.
if nc.respMap == nil {
nc.initNewResp()
Expand Down Expand Up @@ -2944,7 +2956,6 @@ func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) {
if !nc.info.Headers {
return nil, ErrHeadersNotSupported
}

hdr, err = msg.headerBytes()
if err != nil {
return nil, err
Expand All @@ -2960,18 +2971,32 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
return nc.request(subj, nil, data, timeout)
}

func (nc *Conn) useOldRequestStyle() bool {
nc.mu.RLock()
r := nc.Opts.UseOldRequestStyle
nc.mu.RUnlock()
return r
}

func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

nc.mu.Lock()
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, hdr, data, timeout)
var m *Msg
var err error

if nc.useOldRequestStyle() {
m, err = nc.oldRequest(subj, hdr, data, timeout)
} else {
m, err = nc.newRequest(subj, hdr, data, timeout)
}

return nc.newRequest(subj, hdr, data, timeout)
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
m, err = nil, ErrNoResponders
}
return m, err
}

func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
Expand Down
49 changes: 48 additions & 1 deletion test/basic_test.go
Expand Up @@ -15,6 +15,7 @@ package test

import (
"bytes"
"context"
"fmt"
"math"
"regexp"
Expand Down Expand Up @@ -555,11 +556,54 @@ func TestRequestTimeout(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// We now need a responder by default otherwise we will get a no responders error.
nc.SubscribeSync("foo")

if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected to receive a timeout error")
}
}

func TestBasicNoRespondersSupport(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

// Normal new style
if m, err := nc.Request("foo", nil, time.Second); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}
// New style with context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if m, err := nc.RequestWithContext(ctx, "foo", nil); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}

// Now do old request style as well.
nc, err = nats.Connect(s.ClientURL(), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

// Normal old request style
if m, err := nc.Request("foo", nil, time.Second); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}
// Old request style with context
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
if m, err := nc.RequestWithContext(ctx, "foo", nil); err != nats.ErrNoResponders {
t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err)
}
}

func TestOldRequest(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
Expand All @@ -586,13 +630,15 @@ func TestOldRequest(t *testing.T) {
errCh := make(chan error, 1)
start := time.Now()
go func() {
sub, _ := nc.SubscribeSync("checkClose")
defer sub.Unsubscribe()
_, err := nc.Request("checkClose", []byte("should be kicked out on close"), time.Second)
errCh <- err
}()
time.Sleep(100 * time.Millisecond)
nc.Close()
if e := <-errCh; e != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", e)
}
if dur := time.Since(start); dur >= time.Second {
t.Fatalf("Request took too long to bail out: %v", dur)
Expand Down Expand Up @@ -677,6 +723,7 @@ func TestRequestClose(t *testing.T) {
time.Sleep(100 * time.Millisecond)
nc.Close()
}()
nc.SubscribeSync("foo")
if _, err := nc.Request("foo", []byte("help"), 2*time.Second); err != nats.ErrInvalidConnection && err != nats.ErrConnectionClosed {
t.Fatalf("Expected connection error: got %v", err)
}
Expand Down
10 changes: 7 additions & 3 deletions test/conn_test.go
Expand Up @@ -2116,11 +2116,13 @@ func TestGetClientID(t *testing.T) {
optsA := test.DefaultTestOptions
optsA.Port = -1
optsA.Cluster.Port = -1
optsA.Cluster.Name = "test"

srvA := RunServerWithOptions(optsA)
defer srvA.Shutdown()

ch := make(chan bool, 1)
nc1, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvA.Addr().(*net.TCPAddr).Port),
nc1, err := nats.Connect(srvA.ClientURL(),
nats.DiscoveredServersHandler(func(_ *nats.Conn) {
ch <- true
}),
Expand All @@ -2144,13 +2146,15 @@ func TestGetClientID(t *testing.T) {
optsB := test.DefaultTestOptions
optsB.Port = -1
optsB.Cluster.Port = -1
optsB.Cluster.Name = "test"

optsB.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServerWithOptions(optsB)
defer srvB.Shutdown()

// Wait for the discovered callback to fire
if err := Wait(ch); err != nil {
t.Fatal("Did not the discovered callback")
t.Fatal("Did not fire the discovered callback")
}
// Now check CID should be valid and same as before
newCID, err := nc1.GetClientID()
Expand All @@ -2162,7 +2166,7 @@ func TestGetClientID(t *testing.T) {
}

// Create a client to server B
nc2, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvB.Addr().(*net.TCPAddr).Port))
nc2, err := nats.Connect(srvB.ClientURL())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion test/context_test.go
Expand Up @@ -297,13 +297,15 @@ func TestContextOldRequestClosed(t *testing.T) {
errCh := make(chan error, 1)
start := time.Now()
go func() {
sub, _ := nc.SubscribeSync("checkClose")
defer sub.Unsubscribe()
_, err = nc.RequestWithContext(ctx, "checkClose", []byte("should be kicked out on close"))
errCh <- err
}()
time.Sleep(100 * time.Millisecond)
nc.Close()
if e := <-errCh; e != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
t.Fatalf("Unexpected error: %v", e)
}
if dur := time.Since(start); dur >= time.Second {
t.Fatalf("Request took too long to bail out: %v", dur)
Expand Down

0 comments on commit 55956f4

Please sign in to comment.