Skip to content

Commit

Permalink
NATS reconnect fully recreates TCP session
Browse files Browse the repository at this point in the history
(cherry picked from commit 4f9e1e5)
  • Loading branch information
Waldz committed Mar 2, 2020
1 parent 8df28e2 commit c7fbce9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 14 deletions.
1 change: 1 addition & 0 deletions communication/nats/connection_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// Connection represents is publish-subscriber instance which can deliver messages
type Connection interface {
Open() error
Reopen() error
Close()
Check() error
Servers() []string
Expand Down
5 changes: 5 additions & 0 deletions communication/nats/connection_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (conn *ConnectionMock) Open() error {
return nil
}

// Reopen restarts the connection
func (conn *ConnectionMock) Reopen() error {
return nil
}

// Close destructs the connection
func (conn *ConnectionMock) Close() {
conn.queueShutdown <- true
Expand Down
70 changes: 60 additions & 10 deletions communication/nats/connection_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/url"
"strings"
"sync"
"time"

nats_lib "github.com/nats-io/go-nats"
Expand All @@ -33,7 +34,7 @@ const (
DefaultBrokerPort = 4222
)

// ParseServerURI validates given NATS server address
// ParseServerURI validates given NATS server address.
func ParseServerURI(serverURI string) (*url.URL, error) {
// Add scheme first otherwise serverURL.Parse() fails.
if !strings.HasPrefix(serverURI, "nats:") {
Expand Down Expand Up @@ -68,9 +69,11 @@ func newConnection(serverURIs ...string) (*ConnectionWrap, error) {
return connection, nil
}

// ConnectionWrap defines wrapped connection to NATS server(s)
// ConnectionWrap defines wrapped connection to NATS server(s).
type ConnectionWrap struct {
*nats_lib.Conn
conn *nats_lib.Conn
connLock sync.RWMutex

servers []string
onClose func()
}
Expand All @@ -91,31 +94,78 @@ func (c *ConnectionWrap) connectOptions() nats_lib.Options {
// Open starts the connection: left for test compatibility.
// Deprecated: Use nats.BrokerConnector#Connect() instead.
func (c *ConnectionWrap) Open() (err error) {
c.Conn, err = c.connectOptions().Connect()
c.connLock.Lock()
defer c.connLock.Unlock()

c.conn, err = c.connectOptions().Connect()
if err != nil {
return errors.Wrapf(err, `failed to connect to NATS servers "%v"`, c.servers)
}

return nil
}

// Close destructs the connection
// Reopen restarts the connection.
func (c *ConnectionWrap) Reopen() (err error) {
c.connLock.Lock()
defer c.connLock.Unlock()

if c.conn != nil {
c.conn.Close()
}

c.conn, err = c.connectOptions().Connect()
if err != nil {
return errors.Wrapf(err, `failed to reconnect to NATS servers "%v"`, c.servers)
}

return nil
}

// Close destructs the connection.
func (c *ConnectionWrap) Close() {
if c.Conn != nil {
c.Conn.Close()
c.connLock.Lock()
defer c.connLock.Unlock()

if c.conn != nil {
c.conn.Close()
}
c.onClose()
}

// Check checks the connection
// Check checks the connection.
func (c *ConnectionWrap) Check() error {
// Flush sends ping request and tries to send all cached data.
// It return an error if something wrong happened. All other requests
// will be added to queue to be sent after reconnecting.
return c.Conn.FlushTimeout(3 * time.Second)
return c.conn.FlushTimeout(3 * time.Second)
}

// Servers returns list of currently connected servers
// Servers returns list of currently connected servers.
func (c *ConnectionWrap) Servers() []string {
return c.servers
}

// Publish publishes payload to the given subject.
func (c *ConnectionWrap) Publish(subject string, payload []byte) error {
c.connLock.RLock()
defer c.connLock.RUnlock()

return c.conn.Publish(subject, payload)
}

// Subscribe will express interest in the given subject.
func (c *ConnectionWrap) Subscribe(subject string, handler nats_lib.MsgHandler) (*nats_lib.Subscription, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()

return c.conn.Subscribe(subject, handler)
}

// Request will send a request payload and deliver the response message.
func (c *ConnectionWrap) Request(subject string, payload []byte, timeout time.Duration) (*nats_lib.Msg, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()

return c.conn.Request(subject, payload, timeout)
}
4 changes: 2 additions & 2 deletions communication/nats/connection_wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestParseServerURI(t *testing.T) {
func TestConnectionWrap_NewConnection(t *testing.T) {
connection, err := newConnection("nats://127.0.0.1:4222")
assert.NoError(t, err)
assert.Nil(t, connection.Conn)
assert.Nil(t, connection.conn)
assert.Equal(t, []string{"nats://127.0.0.1:4222"}, connection.servers)

connection, err = newConnection("nats://127.0.0.1")
assert.NoError(t, err)
assert.Nil(t, connection.Conn)
assert.Nil(t, connection.conn)
assert.Equal(t, []string{"nats://127.0.0.1:4222"}, connection.servers)

connection, err = newConnection("nats:// example.com")
Expand Down
7 changes: 5 additions & 2 deletions communication/nats/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ func (b *BrokerConnector) ReconnectAll() {
go func(id uuid.UUID, conn *ConnectionWrap) {
defer wg.Done()
log.Info().Msgf("Re-establishing broker connection %v", id)
err := conn.Conn.Flush()
log.Info().Msgf("Re-establishing broker connection %v DONE (check result=%v)", id, err)
if err := conn.Reopen(); err != nil {
log.Err(err).Msgf("Re-establishing broker connection %v failed", id)
} else {
log.Info().Msgf("Re-establishing broker connection %v DONE", id)
}
}(k, v)
}
wg.Wait()
Expand Down

0 comments on commit c7fbce9

Please sign in to comment.