Skip to content

Commit

Permalink
Merge pull request #988 from nats-io/cluster_raft_tport_timeout
Browse files Browse the repository at this point in the history
[FIXED] RAFT Transport timeouts
  • Loading branch information
kozlovic authored Dec 11, 2019
2 parents 79fe520 + b4b3716 commit dbd20ed
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
2 changes: 1 addition & 1 deletion server/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
defaultRaftElectionTimeout = 2 * time.Second
defaultRaftLeaseTimeout = time.Second
defaultRaftCommitTimeout = 100 * time.Millisecond
defaultTPortTimeout = 2 * time.Second
defaultTPortTimeout = 10 * time.Second
)

var (
Expand Down
25 changes: 16 additions & 9 deletions server/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
)

const (
natsConnectInbox = "raft.%s.accept"
natsRequestInbox = "raft.%s.request.%s"
natsConnectInbox = "raft.%s.accept"
natsRequestInbox = "raft.%s.request.%s"
timeoutForDialAndFlush = 2 * time.Second
)

// natsAddr implements the net.Addr interface. An address for the NATS
Expand Down Expand Up @@ -182,7 +183,9 @@ type natsStreamLayer struct {
logger *log.Logger
conns map[*natsConn]struct{}
mu sync.Mutex
timeout time.Duration
// This is the timeout we will use for flush and dial (request timeout),
// not the timeout that RAFT will use to call SetDeadline.
dfTimeout time.Duration
}

func newNATSStreamLayer(id string, conn *nats.Conn, logger *log.Logger, timeout time.Duration) (*natsStreamLayer, error) {
Expand All @@ -191,14 +194,18 @@ func newNATSStreamLayer(id string, conn *nats.Conn, logger *log.Logger, timeout
conn: conn,
logger: logger,
conns: map[*natsConn]struct{}{},
timeout: timeout,
dfTimeout: timeoutForDialAndFlush,
}
// Could be the case in tests...
if timeout < n.dfTimeout {
n.dfTimeout = timeout
}
sub, err := conn.SubscribeSync(fmt.Sprintf(natsConnectInbox, id))
if err != nil {
return nil, err
}
sub.SetPendingLimits(-1, -1)
if err := conn.FlushTimeout(timeout); err != nil {
if err := conn.FlushTimeout(n.dfTimeout); err != nil {
sub.Unsubscribe()
return nil, err
}
Expand Down Expand Up @@ -250,13 +257,13 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration
return nil, err
}
sub.SetPendingLimits(-1, -1)
if err := n.conn.FlushTimeout(n.timeout); err != nil {
if err := n.conn.FlushTimeout(n.dfTimeout); err != nil {
sub.Unsubscribe()
return nil, err
}

// Make connect request to peer.
msg, err := n.conn.Request(fmt.Sprintf(natsConnectInbox, address), data, timeout)
msg, err := n.conn.Request(fmt.Sprintf(natsConnectInbox, address), data, n.dfTimeout)
if err != nil {
sub.Unsubscribe()
return nil, err
Expand Down Expand Up @@ -315,7 +322,7 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) {
sub.Unsubscribe()
continue
}
if err := n.conn.FlushTimeout(n.timeout); err != nil {
if err := n.conn.FlushTimeout(n.dfTimeout); err != nil {
n.logger.Printf("[ERR] raft-nats: Failed to flush connect response to remote peer: %v", err)
sub.Unsubscribe()
continue
Expand Down Expand Up @@ -372,7 +379,7 @@ func newNATSTransportWithLogger(id string, conn *nats.Conn, timeout time.Duratio
// with NATS as the transport layer, using the given config struct.
func newNATSTransportWithConfig(id string, conn *nats.Conn, config *raft.NetworkTransportConfig) (*raft.NetworkTransport, error) {
if config.Timeout == 0 {
config.Timeout = 2 * time.Second
config.Timeout = defaultTPortTimeout
}
return createNATSTransport(id, conn, config.Logger, config.Timeout, func(stream raft.StreamLayer) *raft.NetworkTransport {
config.Stream = stream
Expand Down
2 changes: 1 addition & 1 deletion server/timeout_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"
)

const bufferSize = 4096
const bufferSize = 32768

// ErrTimeout reports a read timeout error
var ErrTimeout = errors.New("natslog: read timeout")
Expand Down

0 comments on commit dbd20ed

Please sign in to comment.