From b4b37168f5bb0c1c1cfc0210f30b8fa88e7d6b36 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 11 Dec 2019 13:39:02 -0700 Subject: [PATCH] [FIXED] RAFT Transport timeouts Keep a 2sec for the dial and flush, but increase the timeout for read operations. This will reduce the risk of getting these type of errors: ``` [ERR] STREAM: raft-net: Failed to flush response: write to closed conn ``` Signed-off-by: Ivan Kozlovic --- server/clustering.go | 2 +- server/raft_transport.go | 25 ++++++++++++++++--------- server/timeout_reader.go | 2 +- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/server/clustering.go b/server/clustering.go index 90dd3700..ac52d604 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -34,7 +34,7 @@ const ( defaultRaftElectionTimeout = 2 * time.Second defaultRaftLeaseTimeout = time.Second defaultRaftCommitTimeout = 100 * time.Millisecond - defaultTPortTimeout = 2 * time.Second + defaultTPortTimeout = 10 * time.Second ) var ( diff --git a/server/raft_transport.go b/server/raft_transport.go index ed6aca84..9dd11ad5 100644 --- a/server/raft_transport.go +++ b/server/raft_transport.go @@ -32,8 +32,9 @@ import ( ) const ( - natsConnectInbox = "raft.%s.accept" - natsRequestInbox = "raft.%s.request.%s" + natsConnectInbox = "raft.%s.accept" + natsRequestInbox = "raft.%s.request.%s" + timeoutForDialAndFlush = 2 * time.Second ) // natsAddr implements the net.Addr interface. An address for the NATS @@ -182,7 +183,9 @@ type natsStreamLayer struct { logger *log.Logger conns map[*natsConn]struct{} mu sync.Mutex - timeout time.Duration + // This is the timeout we will use for flush and dial (request timeout), + // not the timeout that RAFT will use to call SetDeadline. + dfTimeout time.Duration } func newNATSStreamLayer(id string, conn *nats.Conn, logger *log.Logger, timeout time.Duration) (*natsStreamLayer, error) { @@ -191,14 +194,18 @@ func newNATSStreamLayer(id string, conn *nats.Conn, logger *log.Logger, timeout conn: conn, logger: logger, conns: map[*natsConn]struct{}{}, - timeout: timeout, + dfTimeout: timeoutForDialAndFlush, + } + // Could be the case in tests... + if timeout < n.dfTimeout { + n.dfTimeout = timeout } sub, err := conn.SubscribeSync(fmt.Sprintf(natsConnectInbox, id)) if err != nil { return nil, err } sub.SetPendingLimits(-1, -1) - if err := conn.FlushTimeout(timeout); err != nil { + if err := conn.FlushTimeout(n.dfTimeout); err != nil { sub.Unsubscribe() return nil, err } @@ -250,13 +257,13 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration return nil, err } sub.SetPendingLimits(-1, -1) - if err := n.conn.FlushTimeout(n.timeout); err != nil { + if err := n.conn.FlushTimeout(n.dfTimeout); err != nil { sub.Unsubscribe() return nil, err } // Make connect request to peer. - msg, err := n.conn.Request(fmt.Sprintf(natsConnectInbox, address), data, timeout) + msg, err := n.conn.Request(fmt.Sprintf(natsConnectInbox, address), data, n.dfTimeout) if err != nil { sub.Unsubscribe() return nil, err @@ -315,7 +322,7 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) { sub.Unsubscribe() continue } - if err := n.conn.FlushTimeout(n.timeout); err != nil { + if err := n.conn.FlushTimeout(n.dfTimeout); err != nil { n.logger.Printf("[ERR] raft-nats: Failed to flush connect response to remote peer: %v", err) sub.Unsubscribe() continue @@ -372,7 +379,7 @@ func newNATSTransportWithLogger(id string, conn *nats.Conn, timeout time.Duratio // with NATS as the transport layer, using the given config struct. func newNATSTransportWithConfig(id string, conn *nats.Conn, config *raft.NetworkTransportConfig) (*raft.NetworkTransport, error) { if config.Timeout == 0 { - config.Timeout = 2 * time.Second + config.Timeout = defaultTPortTimeout } return createNATSTransport(id, conn, config.Logger, config.Timeout, func(stream raft.StreamLayer) *raft.NetworkTransport { config.Stream = stream diff --git a/server/timeout_reader.go b/server/timeout_reader.go index 768b6517..c62200f5 100644 --- a/server/timeout_reader.go +++ b/server/timeout_reader.go @@ -21,7 +21,7 @@ import ( "time" ) -const bufferSize = 4096 +const bufferSize = 32768 // ErrTimeout reports a read timeout error var ErrTimeout = errors.New("natslog: read timeout")