From ba675d54f44acefe597ff6dc2f70a6ed13544cf1 Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Thu, 11 Apr 2019 13:01:38 -0600 Subject: [PATCH] socket: support network IO deadline (#339) This allows connection-oriented protocols to recover from bad network conditions, by enabling to set a limit on the time for network I/O. As a special case, it allows client/servers to recover from a bad connection during the initial TLS handshake. --- conn.go | 23 +++++++++++++++++++++++ core.go | 14 ++++++++++++-- options.go | 7 +++++++ transport/tlstcp/tlstcp.go | 27 ++++++++++++++++++++------- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/conn.go b/conn.go index e714a7c..79af76a 100644 --- a/conn.go +++ b/conn.go @@ -19,6 +19,7 @@ import ( "io" "net" "sync" + "time" ) // conn implements the Pipe interface on top of net.Conn. The @@ -49,6 +50,10 @@ func (p *conn) Recv() (*Message, error) { var err error var msg *Message + if err := p.setNetworkTimeout(); err != nil { + return nil, err + } + if err = binary.Read(p.c, binary.BigEndian, &sz); err != nil { return nil, err } @@ -78,6 +83,10 @@ func (p *conn) Send(msg *Message) error { return nil } + if err := p.setNetworkTimeout(); err != nil { + return err + } + // send length header if err := binary.Write(p.c, binary.BigEndian, l); err != nil { return err @@ -181,6 +190,10 @@ func (p *conn) handshake(props []interface{}) error { p.maxrx = int64(v.(int)) } + if err := p.setNetworkTimeout(); err != nil { + return err + } + h := connHeader{S: 'S', P: 'P', Proto: p.proto.Number()} if err = binary.Write(p.c, binary.BigEndian, &h); err != nil { return err @@ -207,3 +220,13 @@ func (p *conn) handshake(props []interface{}) error { p.open = true return nil } + +func (p *conn) setNetworkTimeout() error { + if v, err := p.sock.GetOption(OptionNetworkIoDeadline); err == nil { + // Socket implementation ensures that option type is time.Duration. + if iotimeout := v.(time.Duration); iotimeout > 0 { + return p.c.SetDeadline(time.Now().Add(iotimeout)) + } + } + return nil +} diff --git a/core.go b/core.go index 05e8e93..bee0945 100644 --- a/core.go +++ b/core.go @@ -46,8 +46,9 @@ type socket struct { recverr error // error to return on attempts to Recv() senderr error // error to return on attempts to Send() - rdeadline time.Duration - wdeadline time.Duration + rdeadline time.Duration // mangos socket read deadline + wdeadline time.Duration // mangos socket write deadline + iodeadline time.Duration // IO timeout for connection used by pipe reconntime time.Duration // reconnect time after error or disconnect reconnmax time.Duration // max reconnect interval linger time.Duration @@ -437,6 +438,11 @@ func (sock *socket) SetOption(name string, value interface{}) error { sock.wdeadline = value.(time.Duration) sock.Unlock() return nil + case OptionNetworkIoDeadline: + sock.Lock() + sock.iodeadline = value.(time.Duration) + sock.Unlock() + return nil case OptionLinger: sock.Lock() sock.linger = value.(time.Duration) @@ -523,6 +529,10 @@ func (sock *socket) GetOption(name string) (interface{}, error) { sock.Lock() defer sock.Unlock() return sock.wdeadline, nil + case OptionNetworkIoDeadline: + sock.Lock() + defer sock.Unlock() + return sock.iodeadline, nil case OptionLinger: sock.Lock() defer sock.Unlock() diff --git a/options.go b/options.go index e96f27f..1e1783a 100644 --- a/options.go +++ b/options.go @@ -39,6 +39,13 @@ const ( // non-blocking operation. By default there is no timeout. OptionSendDeadline = "SEND-DEADLINE" + // OptionNetworkIoDeadline enables recovery from bad connections for + // connection-based transports; by limiting the time until a (read or + // write) network operation times out. The value is a time.Duration. + // Setting this option is recommended for connection-based protocols, + // it is disabled by default. + OptionNetworkIoDeadline = "NETWORK-IO-DEADLINE" + // OptionRetryTime is used by REQ. The argument is a time.Duration. // When a request has not been replied to within the given duration, // the request will automatically be resent to an available peer. diff --git a/transport/tlstcp/tlstcp.go b/transport/tlstcp/tlstcp.go index b669ff8..b6898b8 100644 --- a/transport/tlstcp/tlstcp.go +++ b/transport/tlstcp/tlstcp.go @@ -85,15 +85,28 @@ func (o options) configTCP(conn *net.TCPConn) error { return err } } + if v, ok := o[mangos.OptionNetworkIoDeadline]; ok { + if err := conn.SetDeadline(time.Now().Add(v.(time.Duration))); err != nil { + return err + } + } return nil } -func newOptions(t *tlsTran) options { - o := make(map[string]interface{}) - o[mangos.OptionTLSConfig] = t.config - o[mangos.OptionNoDelay] = true - o[mangos.OptionKeepAlive] = true +func newOptions(sock mangos.Socket, t *tlsTran) options { + var o = map[string]interface{}{ + mangos.OptionTLSConfig: t.config, + mangos.OptionNoDelay: true, + mangos.OptionKeepAlive: true, + } + + // I/O timeout is needed to avoid blocking on the initial TLS handshake (#339). + if v, err := sock.GetOption(mangos.OptionNetworkIoDeadline); err == nil { + if iotimeout := v.(time.Duration); iotimeout > 0 { + o[mangos.OptionNetworkIoDeadline] = iotimeout + } + } return options(o) } @@ -235,7 +248,7 @@ func (t *tlsTran) NewDialer(addr string, sock mangos.Socket) (mangos.PipeDialer, return nil, err } - d := &dialer{sock: sock, opts: newOptions(t), addr: addr} + d := &dialer{sock: sock, opts: newOptions(sock, t), addr: addr} return d, nil } @@ -243,7 +256,7 @@ func (t *tlsTran) NewDialer(addr string, sock mangos.Socket) (mangos.PipeDialer, // NewListener implements the Transport NewAccepter method. func (t *tlsTran) NewListener(addr string, sock mangos.Socket) (mangos.PipeListener, error) { var err error - l := &listener{sock: sock, opts: newOptions(t)} + l := &listener{sock: sock, opts: newOptions(sock, t)} if addr, err = mangos.StripScheme(t, addr); err != nil { return nil, err