Skip to content

Commit

Permalink
Revert "NATS reconnect fully recreates TCP session"
Browse files Browse the repository at this point in the history
This reverts commit c7fbce9.
  • Loading branch information
vkuznecovas committed Mar 2, 2020
1 parent d4b5648 commit 31b3965
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 73 deletions.
1 change: 0 additions & 1 deletion communication/nats/connection_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
// Connection represents is publish-subscriber instance which can deliver messages
type Connection interface {
Open() error
Reopen() error
Close()
Check() error
Servers() []string
Expand Down
5 changes: 0 additions & 5 deletions communication/nats/connection_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ func (conn *ConnectionMock) Open() error {
return nil
}

// Reopen restarts the connection
func (conn *ConnectionMock) Reopen() error {
return nil
}

// Close destructs the connection
func (conn *ConnectionMock) Close() {
conn.queueShutdown <- true
Expand Down
70 changes: 10 additions & 60 deletions communication/nats/connection_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"net/url"
"strings"
"sync"
"time"

nats_lib "github.com/nats-io/go-nats"
Expand All @@ -34,7 +33,7 @@ const (
DefaultBrokerPort = 4222
)

// ParseServerURI validates given NATS server address.
// ParseServerURI validates given NATS server address
func ParseServerURI(serverURI string) (*url.URL, error) {
// Add scheme first otherwise serverURL.Parse() fails.
if !strings.HasPrefix(serverURI, "nats:") {
Expand Down Expand Up @@ -69,11 +68,9 @@ func newConnection(serverURIs ...string) (*ConnectionWrap, error) {
return connection, nil
}

// ConnectionWrap defines wrapped connection to NATS server(s).
// ConnectionWrap defines wrapped connection to NATS server(s)
type ConnectionWrap struct {
conn *nats_lib.Conn
connLock sync.RWMutex

*nats_lib.Conn
servers []string
onClose func()
}
Expand All @@ -94,78 +91,31 @@ func (c *ConnectionWrap) connectOptions() nats_lib.Options {
// Open starts the connection: left for test compatibility.
// Deprecated: Use nats.BrokerConnector#Connect() instead.
func (c *ConnectionWrap) Open() (err error) {
c.connLock.Lock()
defer c.connLock.Unlock()

c.conn, err = c.connectOptions().Connect()
c.Conn, err = c.connectOptions().Connect()
if err != nil {
return errors.Wrapf(err, `failed to connect to NATS servers "%v"`, c.servers)
}

return nil
}

// Reopen restarts the connection.
func (c *ConnectionWrap) Reopen() (err error) {
c.connLock.Lock()
defer c.connLock.Unlock()

if c.conn != nil {
c.conn.Close()
}

c.conn, err = c.connectOptions().Connect()
if err != nil {
return errors.Wrapf(err, `failed to reconnect to NATS servers "%v"`, c.servers)
}

return nil
}

// Close destructs the connection.
// Close destructs the connection
func (c *ConnectionWrap) Close() {
c.connLock.Lock()
defer c.connLock.Unlock()

if c.conn != nil {
c.conn.Close()
if c.Conn != nil {
c.Conn.Close()
}
c.onClose()
}

// Check checks the connection.
// Check checks the connection
func (c *ConnectionWrap) Check() error {
// Flush sends ping request and tries to send all cached data.
// It return an error if something wrong happened. All other requests
// will be added to queue to be sent after reconnecting.
return c.conn.FlushTimeout(3 * time.Second)
return c.Conn.FlushTimeout(3 * time.Second)
}

// Servers returns list of currently connected servers.
// Servers returns list of currently connected servers
func (c *ConnectionWrap) Servers() []string {
return c.servers
}

// Publish publishes payload to the given subject.
func (c *ConnectionWrap) Publish(subject string, payload []byte) error {
c.connLock.RLock()
defer c.connLock.RUnlock()

return c.conn.Publish(subject, payload)
}

// Subscribe will express interest in the given subject.
func (c *ConnectionWrap) Subscribe(subject string, handler nats_lib.MsgHandler) (*nats_lib.Subscription, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()

return c.conn.Subscribe(subject, handler)
}

// Request will send a request payload and deliver the response message.
func (c *ConnectionWrap) Request(subject string, payload []byte, timeout time.Duration) (*nats_lib.Msg, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()

return c.conn.Request(subject, payload, timeout)
}
4 changes: 2 additions & 2 deletions communication/nats/connection_wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestParseServerURI(t *testing.T) {
func TestConnectionWrap_NewConnection(t *testing.T) {
connection, err := newConnection("nats://127.0.0.1:4222")
assert.NoError(t, err)
assert.Nil(t, connection.conn)
assert.Nil(t, connection.Conn)
assert.Equal(t, []string{"nats://127.0.0.1:4222"}, connection.servers)

connection, err = newConnection("nats://127.0.0.1")
assert.NoError(t, err)
assert.Nil(t, connection.conn)
assert.Nil(t, connection.Conn)
assert.Equal(t, []string{"nats://127.0.0.1:4222"}, connection.servers)

connection, err = newConnection("nats:// example.com")
Expand Down
7 changes: 2 additions & 5 deletions communication/nats/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ func (b *BrokerConnector) ReconnectAll() {
go func(id uuid.UUID, conn *ConnectionWrap) {
defer wg.Done()
log.Info().Msgf("Re-establishing broker connection %v", id)
if err := conn.Reopen(); err != nil {
log.Err(err).Msgf("Re-establishing broker connection %v failed", id)
} else {
log.Info().Msgf("Re-establishing broker connection %v DONE", id)
}
err := conn.Conn.Flush()
log.Info().Msgf("Re-establishing broker connection %v DONE (check result=%v)", id, err)
}(k, v)
}
wg.Wait()
Expand Down

0 comments on commit 31b3965

Please sign in to comment.