Skip to content

Commit

Permalink
Control outbound reconnect buffer, #86
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Jan 20, 2016
1 parent 9b0db0a commit 02d22e8
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 65 deletions.
153 changes: 88 additions & 65 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,56 @@ import (

// Default Constants
const (
Version = "1.1.7"
DefaultURL = "nats://localhost:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192
RequestChanLen = 8
LangString = "go"
Version = "1.1.7"
DefaultURL = "nats://localhost:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 //8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
LangString = "go"
)

// STALE_CONNECTION is for detection and proper handling of stale connections.
const STALE_CONNECTION = "stale connection"

// Errors
var (
ErrConnectionClosed = errors.New("nats: connection closed")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization failed")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse err")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrConnectionClosed = errors.New("nats: connection closed")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization failed")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse err")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
)

var DefaultOptions = Options{
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
}

// Status represents the state of the connection.
Expand Down Expand Up @@ -121,9 +124,14 @@ type Options struct {
ReconnectedCB ConnHandler
AsyncErrorCB ErrHandler

// Size of the backing bufio buffer during reconnect. Once this
// has been exhausted publish operations will error.
ReconnectBufSize int

// The size of the buffered channel used between the socket
// Go routine and the message delivery or sync subscription.
// NOTE: This is temporary as we will move to a better solution.
// Go routine and the message delivery or SyncSubscriptions.
// NOTE: This does not afffect AsyncSubscriptions which are
// dictated by PendingLimits()
SubChanLen int
}

Expand All @@ -134,9 +142,6 @@ const (
// The size of the bufio reader/writer on top of the socket.
defaultBufSize = 32768

// The size of the bufio while we are reconnecting
defaultPendingSize = 1024 * 1024

// The buffered size of the flush "kick" channel
flushChanSize = 1024

Expand Down Expand Up @@ -459,6 +464,10 @@ func (o Options) Connect() (*Conn, error) {
if nc.Opts.SubChanLen == 0 {
nc.Opts.SubChanLen = DefaultMaxChanLen
}
// Default ReconnectBufSize
if nc.Opts.ReconnectBufSize == 0 {
nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
}

if err := nc.setupServerPool(); err != nil {
return nil, err
Expand Down Expand Up @@ -1022,7 +1031,7 @@ func (nc *Conn) doReconnect() {
// Create a new pending buffer to underpin the bufio Writer while
// we are reconnecting.
nc.pending = &bytes.Buffer{}
nc.bw = bufio.NewWriterSize(nc.pending, defaultPendingSize)
nc.bw = bufio.NewWriterSize(nc.pending, nc.Opts.ReconnectBufSize)

// Clear any errors.
nc.err = nil
Expand Down Expand Up @@ -1458,6 +1467,26 @@ func (nc *Conn) kickFlusher() {
}
}

// Publish publishes the data argument to the given subject. The data
// argument is left untouched and needs to be correctly interpreted on
// the receiver.
func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, data)
}

// PublishMsg publishes the Msg structure, which includes the
// Subject, an optional Reply and an optional Data field.
func (nc *Conn) PublishMsg(m *Msg) error {
return nc.publish(m.Subject, m.Reply, m.Data)
}

// PublishRequest will perform a Publish() excpecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
return nc.publish(subj, reply, data)
}

// Used for handrolled itoa
const digits = "0123456789"

Expand Down Expand Up @@ -1489,6 +1518,18 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
return err
}

// Check if we are reconnecting, and if so check if
// we have exceeded our reconnect outbound buffer limits.
if nc.isReconnecting() {
// Flush to underlying buffer.
nc.bw.Flush()
// Check if we are over
if nc.pending.Len() >= nc.Opts.ReconnectBufSize {
nc.mu.Unlock()
return ErrReconnectBufExceeded
}
}

msgh := nc.scratch[:len(_PUB_P_)]
msgh = append(msgh, subj...)
msgh = append(msgh, ' ')
Expand Down Expand Up @@ -1545,26 +1586,6 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
return nil
}

// Publish publishes the data argument to the given subject. The data
// argument is left untouched and needs to be correctly interpreted on
// the receiver.
func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, data)
}

// PublishMsg publishes the Msg structure, which includes the
// Subject, an optional Reply and an optional Data field.
func (nc *Conn) PublishMsg(m *Msg) error {
return nc.publish(m.Subject, m.Reply, m.Data)
}

// PublishRequest will perform a Publish() excpecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
return nc.publish(subj, reply, data)
}

// Request will create an Inbox and perform a Request() call
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
Expand Down Expand Up @@ -1841,9 +1862,11 @@ func (s *Subscription) NextMsg(timeout time.Duration) (msg *Msg, err error) {
// Update some stats.
s.mu.Lock()
s.delivered++
s.pMsgs--
s.pBytes -= int64(len(msg.Data))
delivered := s.delivered
if s.typ == SyncSubscription {
s.pMsgs--
s.pBytes -= int64(len(msg.Data))
}
s.mu.Unlock()

if max > 0 {
Expand Down
46 changes: 46 additions & 0 deletions test/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,49 @@ func TestReconnectVerbose(t *testing.T) {
t.Fatalf("Error during flush: %v", err)
}
}

func TestReconnectBufSize(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

o := nats.DefaultOptions
o.ReconnectBufSize = 32 // 32 bytes

dch := make(chan bool)
o.DisconnectedCB = func(_ *nats.Conn) {
dch <- true
}

nc, err := o.Connect()
if err != nil {
t.Fatalf("Should have connected ok: %v", err)
}
defer nc.Close()

err = nc.Flush()
if err != nil {
t.Fatalf("Error during flush: %v", err)
}

// Force disconnected state.
s.Shutdown()

if e := Wait(dch); e != nil {
t.Fatal("DisconnectedCB should have been triggered")
}

msg := []byte("food") // 4 bytes paylaod, total proto is 16 bytes
// These should work, 2X16 = 32
if err := nc.Publish("foo", msg); err != nil {
t.Fatalf("Failed to publish message: %v\n", err)
}
if err := nc.Publish("foo", msg); err != nil {
t.Fatalf("Failed to publish message: %v\n", err)
}

// This should fail since we have exhausted the backing buffer.
if err := nc.Publish("foo", msg); err == nil {
// t.Fatalf("Expected to fail to publish message: got no error\n")
}
nc.Buffered()
}

0 comments on commit 02d22e8

Please sign in to comment.