Skip to content

Commit

Permalink
Merge pull request #375 from nats-io/noecho
Browse files Browse the repository at this point in the history
NoEcho Option Support
  • Loading branch information
derekcollison committed Jul 25, 2018
2 parents 9f62858 + e04b7c4 commit ff578ff
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 13 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Expand Up @@ -3,7 +3,6 @@ sudo: false
go:
- 1.10.x
- 1.9.x
- 1.8.x
install:
- go get -t ./...
- go get github.com/nats-io/gnatsd
Expand All @@ -17,5 +16,5 @@ before_script:
- misspell -error -locale US .
- megacheck -ignore "$(cat staticcheck.ignore)" ./...
script:
- go test -i -race ./...
- go test -i -race ./...
- if [[ "$TRAVIS_GO_VERSION" == 1.9.* ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race ./...; fi
36 changes: 30 additions & 6 deletions nats.go
Expand Up @@ -40,7 +40,7 @@ import (

// Default Constants
const (
Version = "1.5.0"
Version = "1.6.0"
DefaultURL = "nats://localhost:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
Expand Down Expand Up @@ -88,6 +88,7 @@ var (
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
)

Expand Down Expand Up @@ -167,6 +168,11 @@ type Options struct {
// server pool.
NoRandomize bool

// NoEcho configures whether the server will echo back messages
// that are sent on this connection if we also have matching subscriptions.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
NoEcho bool

// Name is an optional name label which will be sent to the server
// on CONNECT to identify the client.
Name string
Expand Down Expand Up @@ -408,6 +414,7 @@ type serverInfo struct {
TLSRequired bool `json:"tls_required"`
MaxPayload int64 `json:"max_payload"`
ConnectURLs []string `json:"connect_urls,omitempty"`
Proto int `json:"proto,omitempty"`
}

const (
Expand All @@ -430,6 +437,7 @@ type connectInfo struct {
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
}

// MsgHandler is a callback function that processes messages delivered to
Expand Down Expand Up @@ -539,6 +547,15 @@ func DontRandomize() Option {
}
}

// NoEcho is an Option to turn off messages echoing back from a server.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
func NoEcho() Option {
return func(o *Options) error {
o.NoEcho = true
return nil
}
}

// ReconnectWait is an Option to set the wait time between reconnect attempts.
func ReconnectWait(t time.Duration) Option {
return func(o *Options) error {
Expand Down Expand Up @@ -1044,7 +1061,7 @@ func (nc *Conn) setup() {
// Process a connected connection and initialize properly.
func (nc *Conn) processConnectInit() error {

// Set out deadline for the whole connect process
// Set our deadline for the whole connect process
nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
defer nc.conn.SetDeadline(time.Time{})

Expand Down Expand Up @@ -1203,18 +1220,25 @@ func (nc *Conn) connectProto() (string, error) {
pass, _ = u.Password()
}
} else {
// Take from options (pssibly all empty strings)
// Take from options (possibly all empty strings)
user = nc.Opts.User
pass = nc.Opts.Password
token = nc.Opts.Token
}
cinfo := connectInfo{o.Verbose, o.Pedantic,
user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo}

cinfo := connectInfo{o.Verbose, o.Pedantic, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho}

b, err := json.Marshal(cinfo)
if err != nil {
return _EMPTY_, ErrJsonParse
}

// Check if NoEcho is set and we have a server that supports it.
if o.NoEcho && nc.info.Proto < 1 {
return _EMPTY_, ErrNoEchoNotSupported
}

return fmt.Sprintf(conProto, b), nil
}

Expand Down
58 changes: 58 additions & 0 deletions nats_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1164,3 +1165,60 @@ func TestPingTimerLeakedOnClose(t *testing.T) {
t.Fatal("Pinger timer should not be set")
}
}

func TestNoEcho(t *testing.T) {
s := RunServerOnPort(TEST_PORT)
defer s.Shutdown()

url := fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT)

nc, err := Connect(url, NoEcho())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

r := int32(0)
_, err = nc.Subscribe("foo", func(m *Msg) {
atomic.AddInt32(&r, 1)
})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

err = nc.Publish("foo", []byte("Hello World"))
if err != nil {
t.Fatalf("Error on publish: %v", err)
}
nc.Flush()
nc.Flush()

if nr := atomic.LoadInt32(&r); nr != 0 {
t.Fatalf("Expected no messages echoed back, received %d\n", nr)
}
}

func TestNoEchoOldServer(t *testing.T) {
opts := GetDefaultOptions()
opts.Url = DefaultURL
opts.NoEcho = true

nc := &Conn{Opts: opts}
if err := nc.setupServerPool(); err != nil {
t.Fatalf("Problem setting up Server Pool: %v\n", err)
}

// Old style with no proto, meaning 0. We need Proto:1 for NoEcho support.
oldInfo := "{\"server_id\":\"22\",\"version\":\"1.1.0\",\"go\":\"go1.10.2\",\"port\":4222,\"max_payload\":1048576}"

err := nc.processInfo(oldInfo)
if err != nil {
t.Fatalf("Error processing old style INFO: %v\n", err)
}

// Make sure connectProto generates an error.
_, err = nc.connectProto()
if err == nil {
t.Fatalf("Expected an error but got none\n")
}
}
2 changes: 1 addition & 1 deletion test/auth_test.go
Expand Up @@ -186,7 +186,7 @@ func TestPermViolation(t *testing.T) {
opts := test.DefaultTestOptions
opts.Port = 8232
opts.Users = []*server.User{
&server.User{
{
Username: "ivan",
Password: "pwd",
Permissions: &server.Permissions{
Expand Down
8 changes: 4 additions & 4 deletions test/conn_test.go
Expand Up @@ -1364,8 +1364,8 @@ func TestUseCustomDialer(t *testing.T) {
// should take precedence. That means that the connection
// should fail for these two set of options.
options := []*nats.Options{
&nats.Options{Dialer: dialer, CustomDialer: cdialer},
&nats.Options{CustomDialer: cdialer},
{Dialer: dialer, CustomDialer: cdialer},
{CustomDialer: cdialer},
}
for _, o := range options {
o.Servers = []string{nats.DefaultURL}
Expand All @@ -1386,8 +1386,8 @@ func TestUseCustomDialer(t *testing.T) {
}
// Same with variadic
foptions := [][]nats.Option{
[]nats.Option{nats.Dialer(dialer), nats.SetCustomDialer(cdialer)},
[]nats.Option{nats.SetCustomDialer(cdialer)},
{nats.Dialer(dialer), nats.SetCustomDialer(cdialer)},
{nats.SetCustomDialer(cdialer)},
}
for _, fos := range foptions {
nc, err := nats.Connect(nats.DefaultURL, fos...)
Expand Down

0 comments on commit ff578ff

Please sign in to comment.