Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NoEcho Option Support #375

Merged
merged 4 commits into from Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Expand Up @@ -3,7 +3,6 @@ sudo: false
go:
- 1.10.x
- 1.9.x
- 1.8.x
install:
- go get -t ./...
- go get github.com/nats-io/gnatsd
Expand All @@ -17,5 +16,5 @@ before_script:
- misspell -error -locale US .
- megacheck -ignore "$(cat staticcheck.ignore)" ./...
script:
- go test -i -race ./...
- go test -i -race ./...
- if [[ "$TRAVIS_GO_VERSION" == 1.9.* ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race ./...; fi
34 changes: 29 additions & 5 deletions nats.go
Expand Up @@ -88,6 +88,7 @@ var (
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
)

Expand Down Expand Up @@ -167,6 +168,11 @@ type Options struct {
// server pool.
NoRandomize bool

// NoEcho configures whether the server will echo back messages
// that are sent on this connection if we also have matching subscriptions.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
NoEcho bool

// Name is an optional name label which will be sent to the server
// on CONNECT to identify the client.
Name string
Expand Down Expand Up @@ -408,6 +414,7 @@ type serverInfo struct {
TLSRequired bool `json:"tls_required"`
MaxPayload int64 `json:"max_payload"`
ConnectURLs []string `json:"connect_urls,omitempty"`
Proto int `json:"proto,omitempty"`
}

const (
Expand All @@ -430,6 +437,7 @@ type connectInfo struct {
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
}

// MsgHandler is a callback function that processes messages delivered to
Expand Down Expand Up @@ -539,6 +547,15 @@ func DontRandomize() Option {
}
}

// NoEcho is an Option to turn off messages echoing back from a server.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
func NoEcho() Option {
return func(o *Options) error {
o.NoEcho = true
return nil
}
}

// ReconnectWait is an Option to set the wait time between reconnect attempts.
func ReconnectWait(t time.Duration) Option {
return func(o *Options) error {
Expand Down Expand Up @@ -1044,7 +1061,7 @@ func (nc *Conn) setup() {
// Process a connected connection and initialize properly.
func (nc *Conn) processConnectInit() error {

// Set out deadline for the whole connect process
// Set our deadline for the whole connect process
nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
defer nc.conn.SetDeadline(time.Time{})

Expand Down Expand Up @@ -1203,18 +1220,25 @@ func (nc *Conn) connectProto() (string, error) {
pass, _ = u.Password()
}
} else {
// Take from options (pssibly all empty strings)
// Take from options (possibly all empty strings)
user = nc.Opts.User
pass = nc.Opts.Password
token = nc.Opts.Token
}
cinfo := connectInfo{o.Verbose, o.Pedantic,
user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo}

cinfo := connectInfo{o.Verbose, o.Pedantic, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho}

b, err := json.Marshal(cinfo)
if err != nil {
return _EMPTY_, ErrJsonParse
}

// Check if NoEcho is set and we have a server that supports it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this test in processExpectedInfo() instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could if you think that is a better place. When forming the connectProto we know we have received the server info and are processing the options to some degree, hence why I stuck it there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine I guess, although you could do this check at top of function, since there is no point in marshalling etc.. if we are going to return an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted a marshal error to take precedence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

if o.NoEcho && nc.info.Proto < 1 {
return _EMPTY_, ErrNoEchoNotSupported
}

return fmt.Sprintf(conProto, b), nil
}

Expand Down
58 changes: 58 additions & 0 deletions nats_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1164,3 +1165,60 @@ func TestPingTimerLeakedOnClose(t *testing.T) {
t.Fatal("Pinger timer should not be set")
}
}

func TestNoEcho(t *testing.T) {
s := RunServerOnPort(TEST_PORT)
defer s.Shutdown()

url := fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT)

nc, err := Connect(url, NoEcho())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

r := int32(0)
_, err = nc.Subscribe("foo", func(m *Msg) {
atomic.AddInt32(&r, 1)
})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

err = nc.Publish("foo", []byte("Hello World"))
if err != nil {
t.Fatalf("Error on publish: %v", err)
}
nc.Flush()
nc.Flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional to have 2?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, especially in Travis, this is not enough to guarantee that there is no bug and message is not going to be incorrectly delivered. Maybe you would need 1 flush, but using a different connection, publish an other message, that message should be received, but not the first one.

All that being said, I do believe that the feature is tested in the server. We should focus here in testing APIs related features, which is make sure that the "echo" field is set as expected in the INFO, etc...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the two was intentional instead of having to do a sleep or gosched() call since first may have the scheduler unblock the flush call before firing the async callback. I could make it sync maybe.. Or just comment as to why there are two.


if nr := atomic.LoadInt32(&r); nr != 0 {
t.Fatalf("Expected no messages echoed back, received %d\n", nr)
}
}

func TestNoEchoOldServer(t *testing.T) {
opts := GetDefaultOptions()
opts.Url = DefaultURL
opts.NoEcho = true

nc := &Conn{Opts: opts}
if err := nc.setupServerPool(); err != nil {
t.Fatalf("Problem setting up Server Pool: %v\n", err)
}

// Old style with no proto, meaning 0. We need Proto:1 for NoEcho support.
oldInfo := "{\"server_id\":\"22\",\"version\":\"1.1.0\",\"go\":\"go1.10.2\",\"port\":4222,\"max_payload\":1048576}"

err := nc.processInfo(oldInfo)
if err != nil {
t.Fatalf("Error processing old style INFO: %v\n", err)
}

// Make sure connectProto generates an error.
_, err = nc.connectProto()
if err == nil {
t.Fatalf("Expected an error but got none\n")
}
}
2 changes: 1 addition & 1 deletion test/auth_test.go
Expand Up @@ -186,7 +186,7 @@ func TestPermViolation(t *testing.T) {
opts := test.DefaultTestOptions
opts.Port = 8232
opts.Users = []*server.User{
&server.User{
{
Username: "ivan",
Password: "pwd",
Permissions: &server.Permissions{
Expand Down
8 changes: 4 additions & 4 deletions test/conn_test.go
Expand Up @@ -1364,8 +1364,8 @@ func TestUseCustomDialer(t *testing.T) {
// should take precedence. That means that the connection
// should fail for these two set of options.
options := []*nats.Options{
&nats.Options{Dialer: dialer, CustomDialer: cdialer},
&nats.Options{CustomDialer: cdialer},
{Dialer: dialer, CustomDialer: cdialer},
{CustomDialer: cdialer},
}
for _, o := range options {
o.Servers = []string{nats.DefaultURL}
Expand All @@ -1386,8 +1386,8 @@ func TestUseCustomDialer(t *testing.T) {
}
// Same with variadic
foptions := [][]nats.Option{
[]nats.Option{nats.Dialer(dialer), nats.SetCustomDialer(cdialer)},
[]nats.Option{nats.SetCustomDialer(cdialer)},
{nats.Dialer(dialer), nats.SetCustomDialer(cdialer)},
{nats.SetCustomDialer(cdialer)},
}
for _, fos := range foptions {
nc, err := nats.Connect(nats.DefaultURL, fos...)
Expand Down