Skip to content

Commit

Permalink
deadline io
Browse files Browse the repository at this point in the history
use net.Conn.Set(Read|Write)Deadline to fail fast on network unstability.
run Pinger goroutine to force some Reads and Writes.
  • Loading branch information
funny-falcon committed Dec 1, 2016
1 parent 3db5482 commit 5ecb87d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
23 changes: 21 additions & 2 deletions connection.go
Expand Up @@ -95,6 +95,7 @@ type Greeting struct {
// Opts is a way to configure Connection
type Opts struct {
// Timeout is requests timeout.
// Also used to setup net.TCPConn.Set(Read|Write)Deadline
Timeout time.Duration
// Reconnect is a pause between reconnection attempts.
// If specified, then when tarantool is not reachable or disconnected,
Expand Down Expand Up @@ -185,6 +186,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {

go conn.writer()
go conn.reader()
go conn.pinger()
if conn.opts.Timeout > 0 {
go conn.timeouts()
}
Expand Down Expand Up @@ -238,8 +240,9 @@ func (conn *Connection) dial() (err error) {
}
c := connection.(*net.TCPConn)
c.SetNoDelay(true)
r := bufio.NewReaderSize(c, 128*1024)
w := bufio.NewWriterSize(c, 128*1024)
dc := &DeadlineIO{to: conn.opts.Timeout, c: c}
r := bufio.NewReaderSize(dc, 128*1024)
w := bufio.NewWriterSize(dc, 128*1024)
greeting := make([]byte, 128)
_, err = io.ReadFull(r, greeting)
if err != nil {
Expand Down Expand Up @@ -419,6 +422,22 @@ func (conn *Connection) closeConnectionForever(err error) error {
return err
}

func (conn *Connection) pinger() {
if conn.opts.Timeout == 0 {
return
}
t := time.NewTicker(conn.opts.Timeout / 3)
defer t.Stop()
for {
select {
case <-conn.control:
return
case <-t.C:
}
conn.Ping()
}
}

func (conn *Connection) writer() {
var w *bufio.Writer
var err error
Expand Down
27 changes: 27 additions & 0 deletions deadline_io.go
@@ -0,0 +1,27 @@
package tarantool

import (
"net"
"time"
)

type DeadlineIO struct {
to time.Duration
c *net.TCPConn
}

func (d *DeadlineIO) Write(b []byte) (n int, err error) {
if d.to > 0 {
d.c.SetWriteDeadline(time.Now().Add(d.to))
}
n, err = d.c.Write(b)
return
}

func (d *DeadlineIO) Read(b []byte) (n int, err error) {
if d.to > 0 {
d.c.SetReadDeadline(time.Now().Add(d.to))
}
n, err = d.c.Read(b)
return
}

0 comments on commit 5ecb87d

Please sign in to comment.