From 5ecb87d7c777b6d70dd77d45a729374357e4e50c Mon Sep 17 00:00:00 2001 From: Sokolov Yura aka funny_falcon Date: Thu, 1 Dec 2016 17:31:16 +0300 Subject: [PATCH] deadline io use net.Conn.Set(Read|Write)Deadline to fail fast on network unstability. run Pinger goroutine to force some Reads and Writes. --- connection.go | 23 +++++++++++++++++++++-- deadline_io.go | 27 +++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 deadline_io.go diff --git a/connection.go b/connection.go index acab9dc77..2187d02e2 100644 --- a/connection.go +++ b/connection.go @@ -95,6 +95,7 @@ type Greeting struct { // Opts is a way to configure Connection type Opts struct { // Timeout is requests timeout. + // Also used to setup net.TCPConn.Set(Read|Write)Deadline Timeout time.Duration // Reconnect is a pause between reconnection attempts. // If specified, then when tarantool is not reachable or disconnected, @@ -185,6 +186,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { go conn.writer() go conn.reader() + go conn.pinger() if conn.opts.Timeout > 0 { go conn.timeouts() } @@ -238,8 +240,9 @@ func (conn *Connection) dial() (err error) { } c := connection.(*net.TCPConn) c.SetNoDelay(true) - r := bufio.NewReaderSize(c, 128*1024) - w := bufio.NewWriterSize(c, 128*1024) + dc := &DeadlineIO{to: conn.opts.Timeout, c: c} + r := bufio.NewReaderSize(dc, 128*1024) + w := bufio.NewWriterSize(dc, 128*1024) greeting := make([]byte, 128) _, err = io.ReadFull(r, greeting) if err != nil { @@ -419,6 +422,22 @@ func (conn *Connection) closeConnectionForever(err error) error { return err } +func (conn *Connection) pinger() { + if conn.opts.Timeout == 0 { + return + } + t := time.NewTicker(conn.opts.Timeout / 3) + defer t.Stop() + for { + select { + case <-conn.control: + return + case <-t.C: + } + conn.Ping() + } +} + func (conn *Connection) writer() { var w *bufio.Writer var err error diff --git a/deadline_io.go b/deadline_io.go new file mode 100644 index 000000000..65fe4cd9f --- /dev/null +++ b/deadline_io.go @@ -0,0 +1,27 @@ +package tarantool + +import ( + "net" + "time" +) + +type DeadlineIO struct { + to time.Duration + c *net.TCPConn +} + +func (d *DeadlineIO) Write(b []byte) (n int, err error) { + if d.to > 0 { + d.c.SetWriteDeadline(time.Now().Add(d.to)) + } + n, err = d.c.Write(b) + return +} + +func (d *DeadlineIO) Read(b []byte) (n int, err error) { + if d.to > 0 { + d.c.SetReadDeadline(time.Now().Add(d.to)) + } + n, err = d.c.Read(b) + return +}