diff --git a/context.go b/context.go index 769f88a01..4aa3bf0ee 100644 --- a/context.go +++ b/context.go @@ -58,34 +58,37 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [ return nil, ctx.Err() } - nc.mu.Lock() - // If user wants the old style. - if nc.Opts.UseOldRequestStyle { - nc.mu.Unlock() - return nc.oldRequestWithContext(ctx, subj, hdr, data) - } - - mch, token, err := nc.createNewRequestAndSend(subj, hdr, data) - if err != nil { - return nil, err - } + var m *Msg + var err error - var ok bool - var msg *Msg + // If user wants the old style. + if nc.useOldRequestStyle() { + m, err = nc.oldRequestWithContext(ctx, subj, hdr, data) + } else { + mch, token, err := nc.createNewRequestAndSend(subj, hdr, data) + if err != nil { + return nil, err + } - select { - case msg, ok = <-mch: - if !ok { - return nil, ErrConnectionClosed + var ok bool + + select { + case m, ok = <-mch: + if !ok { + return nil, ErrConnectionClosed + } + case <-ctx.Done(): + nc.mu.Lock() + delete(nc.respMap, token) + nc.mu.Unlock() + return nil, ctx.Err() } - case <-ctx.Done(): - nc.mu.Lock() - delete(nc.respMap, token) - nc.mu.Unlock() - return nil, ctx.Err() } - - return msg, nil + // Check for no responder status. + if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { + m, err = nil, ErrNoResponders + } + return m, err } // oldRequestWithContext utilizes inbox and subscription per request. diff --git a/go.mod b/go.mod index 59ee43d35..09bac4d2d 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 + github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3 github.com/nats-io/nkeys v0.2.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go.sum b/go.sum index f31d0af3b..131113523 100644 --- a/go.sum +++ b/go.sum @@ -7,17 +7,23 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed h1:nnV8Mw23aNwNpKuQWuVBEuAqyBOEY21hLWKpVdNr6dQ= +github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3 h1:aDJ5IrBlq4KHBgwWZtKXi1lvY2EkRYOiC2KQfdLTJL8= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200617224755-fa744fdcdaa3/go.mod h1:uXGA6y1uxwW755SK+LoDZggh+UUVsbVoxh8ZG8MqbsI= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= +github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= @@ -35,6 +41,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/nats.go b/nats.go index daa56fdb7..1e78bf0f4 100644 --- a/nats.go +++ b/nats.go @@ -123,6 +123,7 @@ var ( ErrDisconnected = errors.New("nats: server is disconnected") ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + ErrNoResponders = errors.New("nats: no responders available for request") ) func init() { @@ -589,21 +590,22 @@ const ( ) type connectInfo struct { - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - UserJWT string `json:"jwt,omitempty"` - Nkey string `json:"nkey,omitempty"` - Signature string `json:"sig,omitempty"` - User string `json:"user,omitempty"` - Pass string `json:"pass,omitempty"` - Token string `json:"auth_token,omitempty"` - TLS bool `json:"tls_required"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` - Echo bool `json:"echo"` - Headers bool `json:"headers"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + UserJWT string `json:"jwt,omitempty"` + Nkey string `json:"nkey,omitempty"` + Signature string `json:"sig,omitempty"` + User string `json:"user,omitempty"` + Pass string `json:"pass,omitempty"` + Token string `json:"auth_token,omitempty"` + TLS bool `json:"tls_required"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + Echo bool `json:"echo"` + Headers bool `json:"headers"` + NoResponders bool `json:"no_responders"` } // MsgHandler is a callback function that processes messages delivered to @@ -1711,8 +1713,10 @@ func (nc *Conn) connectProto() (string, error) { token = nc.Opts.TokenHandler() } + // If our server does not support headers then we can't do them or no responders. + hdrs := nc.info.Headers cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token, - o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true} + o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs} b, err := json.Marshal(cinfo) if err != nil { @@ -2689,21 +2693,28 @@ func NewMsg(subject string) *Msg { } const ( - hdrLine = "NATS/1.0\r\n" - crlf = "\r\n" - hdrPreEnd = len(hdrLine) - len(crlf) + hdrLine = "NATS/1.0\r\n" + crlf = "\r\n" + hdrPreEnd = len(hdrLine) - len(crlf) + statusHdr = "Status" + noResponders = "503" ) // decodeHeadersMsg will decode and headers. func decodeHeadersMsg(data []byte) (http.Header, error) { tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data))) - if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] { + l, err := tp.ReadLine() + if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] { return nil, ErrBadHeaderMsg } mh, err := tp.ReadMIMEHeader() if err != nil { return nil, ErrBadHeaderMsg } + // Check if we have an inlined status. + if len(l) > hdrPreEnd { + mh.Add(statusHdr, strings.TrimLeft(l[hdrPreEnd:], " ")) + } return http.Header(mh), nil } @@ -2904,6 +2915,7 @@ func (nc *Conn) respHandler(m *Msg) { // Helper to setup and send new request style requests. Return the chan to receive the response. func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) { + nc.mu.Lock() // Do setup for the new style if needed. if nc.respMap == nil { nc.initNewResp() @@ -2944,7 +2956,6 @@ func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) { if !nc.info.Headers { return nil, ErrHeadersNotSupported } - hdr, err = msg.headerBytes() if err != nil { return nil, err @@ -2960,18 +2971,32 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, return nc.request(subj, nil, data, timeout) } +func (nc *Conn) useOldRequestStyle() bool { + nc.mu.RLock() + r := nc.Opts.UseOldRequestStyle + nc.mu.RUnlock() + return r +} + func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { if nc == nil { return nil, ErrInvalidConnection } - nc.mu.Lock() - if nc.Opts.UseOldRequestStyle { - nc.mu.Unlock() - return nc.oldRequest(subj, hdr, data, timeout) + var m *Msg + var err error + + if nc.useOldRequestStyle() { + m, err = nc.oldRequest(subj, hdr, data, timeout) + } else { + m, err = nc.newRequest(subj, hdr, data, timeout) } - return nc.newRequest(subj, hdr, data, timeout) + // Check for no responder status. + if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { + m, err = nil, ErrNoResponders + } + return m, err } func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { diff --git a/test/basic_test.go b/test/basic_test.go index 4e287729a..1a3134f1c 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -15,6 +15,7 @@ package test import ( "bytes" + "context" "fmt" "math" "regexp" @@ -555,11 +556,54 @@ func TestRequestTimeout(t *testing.T) { nc := NewDefaultConnection(t) defer nc.Close() + // We now need a responder by default otherwise we will get a no responders error. + nc.SubscribeSync("foo") + if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err != nats.ErrTimeout { t.Fatalf("Expected to receive a timeout error") } } +func TestBasicNoRespondersSupport(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Error connecting to server: %v", err) + } + defer nc.Close() + + // Normal new style + if m, err := nc.Request("foo", nil, time.Second); err != nats.ErrNoResponders { + t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err) + } + // New style with context + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if m, err := nc.RequestWithContext(ctx, "foo", nil); err != nats.ErrNoResponders { + t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err) + } + + // Now do old request style as well. + nc, err = nats.Connect(s.ClientURL(), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error connecting to server: %v", err) + } + defer nc.Close() + + // Normal old request style + if m, err := nc.Request("foo", nil, time.Second); err != nats.ErrNoResponders { + t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err) + } + // Old request style with context + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + if m, err := nc.RequestWithContext(ctx, "foo", nil); err != nats.ErrNoResponders { + t.Fatalf("Expected a no responders error and nil msg, got m:%+v and err: %v", m, err) + } +} + func TestOldRequest(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() @@ -586,13 +630,15 @@ func TestOldRequest(t *testing.T) { errCh := make(chan error, 1) start := time.Now() go func() { + sub, _ := nc.SubscribeSync("checkClose") + defer sub.Unsubscribe() _, err := nc.Request("checkClose", []byte("should be kicked out on close"), time.Second) errCh <- err }() time.Sleep(100 * time.Millisecond) nc.Close() if e := <-errCh; e != nats.ErrConnectionClosed { - t.Fatalf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", e) } if dur := time.Since(start); dur >= time.Second { t.Fatalf("Request took too long to bail out: %v", dur) @@ -677,6 +723,7 @@ func TestRequestClose(t *testing.T) { time.Sleep(100 * time.Millisecond) nc.Close() }() + nc.SubscribeSync("foo") if _, err := nc.Request("foo", []byte("help"), 2*time.Second); err != nats.ErrInvalidConnection && err != nats.ErrConnectionClosed { t.Fatalf("Expected connection error: got %v", err) } diff --git a/test/conn_test.go b/test/conn_test.go index d03a7c30f..91db87ba9 100644 --- a/test/conn_test.go +++ b/test/conn_test.go @@ -2116,11 +2116,13 @@ func TestGetClientID(t *testing.T) { optsA := test.DefaultTestOptions optsA.Port = -1 optsA.Cluster.Port = -1 + optsA.Cluster.Name = "test" + srvA := RunServerWithOptions(optsA) defer srvA.Shutdown() ch := make(chan bool, 1) - nc1, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvA.Addr().(*net.TCPAddr).Port), + nc1, err := nats.Connect(srvA.ClientURL(), nats.DiscoveredServersHandler(func(_ *nats.Conn) { ch <- true }), @@ -2144,13 +2146,15 @@ func TestGetClientID(t *testing.T) { optsB := test.DefaultTestOptions optsB.Port = -1 optsB.Cluster.Port = -1 + optsB.Cluster.Name = "test" + optsB.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) srvB := RunServerWithOptions(optsB) defer srvB.Shutdown() // Wait for the discovered callback to fire if err := Wait(ch); err != nil { - t.Fatal("Did not the discovered callback") + t.Fatal("Did not fire the discovered callback") } // Now check CID should be valid and same as before newCID, err := nc1.GetClientID() @@ -2162,7 +2166,7 @@ func TestGetClientID(t *testing.T) { } // Create a client to server B - nc2, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvB.Addr().(*net.TCPAddr).Port)) + nc2, err := nats.Connect(srvB.ClientURL()) if err != nil { t.Fatalf("Error on connect: %v", err) } diff --git a/test/context_test.go b/test/context_test.go index fdba58c69..9436620bc 100644 --- a/test/context_test.go +++ b/test/context_test.go @@ -297,13 +297,15 @@ func TestContextOldRequestClosed(t *testing.T) { errCh := make(chan error, 1) start := time.Now() go func() { + sub, _ := nc.SubscribeSync("checkClose") + defer sub.Unsubscribe() _, err = nc.RequestWithContext(ctx, "checkClose", []byte("should be kicked out on close")) errCh <- err }() time.Sleep(100 * time.Millisecond) nc.Close() if e := <-errCh; e != nats.ErrConnectionClosed { - t.Fatalf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", e) } if dur := time.Since(start); dur >= time.Second { t.Fatalf("Request took too long to bail out: %v", dur) diff --git a/test/headers_test.go b/test/headers_test.go index 105d75a5e..362f9164a 100644 --- a/test/headers_test.go +++ b/test/headers_test.go @@ -19,7 +19,6 @@ import ( "time" natsserver "github.com/nats-io/nats-server/v2/test" - "github.com/nats-io/nats.go" )