Skip to content

Commit

Permalink
reversion to decay method and global ping
Browse files Browse the repository at this point in the history
  • Loading branch information
boj committed Aug 6, 2013
1 parent 6a492ff commit c46701f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 54 deletions.
11 changes: 11 additions & 0 deletions client.go
Expand Up @@ -2,6 +2,7 @@ package riakpbc

import (
"log"
"time"
)

type Client struct {
Expand Down Expand Up @@ -44,6 +45,7 @@ func (c *Client) Dial() error {
for _, node := range c.pool.nodes {
err := node.Dial()
if err != nil {
node.RecordError(10.0)
if c.LoggingEnabled() {
log.Print("[POOL] Error: ", err)
}
Expand All @@ -58,6 +60,8 @@ func (c *Client) Dial() error {
return ErrZeroNodes
}

go c.BackgroundNodePing()

return nil
}

Expand All @@ -66,6 +70,13 @@ func (c *Client) Close() {
c.pool.Close()
}

func (c *Client) BackgroundNodePing() {
for {
time.Sleep(time.Duration(c.pingFrequency) * time.Millisecond)
c.pool.Ping()
}
}

// SelectNode selects a node from the pool, see *Pool.SelectNode()
func (c *Client) SelectNode() (*Node, error) {
return c.pool.SelectNode()
Expand Down
70 changes: 26 additions & 44 deletions node.go
Expand Up @@ -16,14 +16,14 @@ type Node struct {
conn *net.TCPConn
readTimeout time.Duration
writeTimeout time.Duration
retryTimeout time.Duration
errorRate *Decaying
ok bool
oklock *sync.Mutex
sync.Mutex
}

// Returns a new Node.
func NewNode(addr string, readTimeout, writeTimeout, retryTimeout time.Duration) (*Node, error) {
func NewNode(addr string, readTimeout, writeTimeout time.Duration) (*Node, error) {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -34,27 +34,37 @@ func NewNode(addr string, readTimeout, writeTimeout, retryTimeout time.Duration)
tcpAddr: tcpaddr,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
retryTimeout: retryTimeout,
ok: false,
errorRate: NewDecaying(),
ok: true,
oklock: &sync.Mutex{},
}

return node, nil
}

// Dial connects to a single riak node. A Node is not OK until it has successfully dialed.
// Dial connects to a single riak node.
func (node *Node) Dial() (err error) {
node.conn, err = net.DialTCP("tcp", nil, node.tcpAddr)
if err != nil {
node.RecordError()
return err
}
node.SetOk(true)

node.conn.SetKeepAlive(true)

return nil
}

// ErrorRate safely returns the current Node's error rate.
func (node *Node) ErrorRate() float64 {
return node.errorRate.Value()
}

// RecordError sets the Node into a state check. The Node reports itself as unavailable in the interim.
func (node *Node) RecordError(amount float64) {
node.SetOk(false)
node.errorRate.Add(amount)
}

func (node *Node) GetOk() bool {
var out bool
node.oklock.Lock()
Expand All @@ -69,35 +79,6 @@ func (node *Node) SetOk(ok bool) {
node.oklock.Unlock()
}

// RecordError sets the Node into a redial state. The Node reports itself as down until it has redialed.
func (node *Node) RecordError() {
if node.GetOk() {
node.SetOk(false)
go node.BackgroundRedial()
}
}

// BackgroundRedial continues to redial the Node in the background every retryTimeout, up to NODE_DOWN_MAX_RETRY.
func (node *Node) BackgroundRedial() {
node.Lock()
time.Sleep(node.retryTimeout)
node.Unlock()

if err := node.Dial(); err == nil {
node.Lock()
node.retryTimeout = NODE_DOWN_RETRY
node.Unlock()
return
}

node.Lock()
if node.retryTimeout < NODE_DOWN_MAX_RETRY {
node.retryTimeout += NODE_DOWN_RETRY_INCREMET
}
node.Unlock()
go node.BackgroundRedial()
}

func (node *Node) ReqResp(reqstruct interface{}, structname string, raw bool) (response interface{}, err error) {
node.Lock()
if raw == true {
Expand Down Expand Up @@ -171,6 +152,7 @@ func (node *Node) Close() {

func (node *Node) write(formattedRequest []byte) (err error) {
node.conn.SetWriteDeadline(time.Now().Add(node.readTimeout))

_, err = node.conn.Write(formattedRequest)
if err != nil {
return err
Expand All @@ -197,7 +179,7 @@ func (node *Node) read() (respraw []byte, err error) {
// read rest of message
m, err := io.ReadFull(node.conn, data)
if err != nil {
node.RecordError()
node.RecordError(1.0)
return nil, err
}
if m == int(size) {
Expand All @@ -210,19 +192,19 @@ func (node *Node) read() (respraw []byte, err error) {
func (node *Node) response() (response interface{}, err error) {
rawresp, err := node.read()
if err != nil {
node.RecordError()
node.RecordError(1.0)
return nil, err
}

err = validateResponseHeader(rawresp)
if err != nil {
node.RecordError()
node.RecordError(1.0)
return nil, err
}

response, err = unmarshalResponse(rawresp)
if err != nil || response == nil {
node.RecordError()
node.RecordError(1.0)
return nil, err
}

Expand All @@ -232,13 +214,13 @@ func (node *Node) response() (response interface{}, err error) {
func (node *Node) request(reqstruct interface{}, structname string) (err error) {
marshaledRequest, err := proto.Marshal(reqstruct.(proto.Message))
if err != nil {
node.RecordError()
node.RecordError(1.0)
return err
}

err = node.rawRequest(marshaledRequest, structname)
if err != nil {
node.RecordError()
node.RecordError(1.0)
return err
}

Expand All @@ -248,13 +230,13 @@ func (node *Node) request(reqstruct interface{}, structname string) (err error)
func (node *Node) rawRequest(marshaledRequest []byte, structname string) (err error) {
formattedRequest, err := prependRequestHeader(structname, marshaledRequest)
if err != nil {
node.RecordError()
node.RecordError(1.0)
return err
}

err = node.write(formattedRequest)
if err != nil {
node.RecordError()
node.RecordError(1.0)
return err
}
return
Expand Down
39 changes: 29 additions & 10 deletions pool.go
Expand Up @@ -8,11 +8,9 @@ import (
)

const (
NODE_WRITE_RETRY time.Duration = time.Second * 5 // 5s
NODE_READ_RETRY time.Duration = time.Second * 5 // 5s
NODE_DOWN_RETRY time.Duration = time.Second / 2 // 0.5s
NODE_DOWN_MAX_RETRY time.Duration = time.Second * 5 // 5s
NODE_DOWN_RETRY_INCREMET time.Duration = time.Second / 2 // 0.5s
NODE_WRITE_RETRY time.Duration = time.Second * 10 // 10s
NODE_READ_RETRY time.Duration = time.Second * 10 // 10s
NODE_ERROR_THRESHOLD float64 = 0.1
)

type Pool struct {
Expand All @@ -26,7 +24,7 @@ func NewPool(cluster []string) *Pool {
nodeMap := make(map[string]*Node, len(cluster))

for _, node := range cluster {
newNode, err := NewNode(node, NODE_READ_RETRY, NODE_WRITE_RETRY, NODE_DOWN_RETRY)
newNode, err := NewNode(node, NODE_READ_RETRY, NODE_WRITE_RETRY)
if err == nil {
nodeMap[node] = newNode
}
Expand All @@ -46,18 +44,39 @@ func (pool *Pool) SelectNode() (*Node, error) {

var possibleNodes []*Node
for _, node := range pool.nodes {
if node.GetOk() {
if node.ErrorRate() < NODE_ERROR_THRESHOLD {
possibleNodes = append(possibleNodes, node)
}
}

if len(possibleNodes) > 0 {
return possibleNodes[rand.Int31n(int32(len(possibleNodes)))], nil
count := len(possibleNodes)

if count > 0 {
return possibleNodes[rand.Int31n(int32(count))], nil
}

return nil, ErrAllNodesDown
}

func (pool *Pool) Ping() {
pool.Lock()
defer pool.Unlock()

for _, node := range pool.nodes {
nodeGood := node.Ping()
if nodeGood == false {
node.RecordError(1.0)
node.Lock()
node.Close()
node.Dial()
node.Unlock()
} else {
node.SetOk(true)
}

}
}

func (pool *Pool) Close() {
for _, node := range pool.nodes {
node.Close()
Expand All @@ -71,7 +90,7 @@ func (pool *Pool) Size() int {
func (pool *Pool) String() string {
var outString string
for _, node := range pool.nodes {
nodeString := fmt.Sprintf(" [%s %f <%t>] ", node.addr, node.GetOk())
nodeString := fmt.Sprintf(" [%s %f <%t>] ", node.addr, node.ErrorRate(), node.GetOk())
outString += nodeString
}
return outString
Expand Down

0 comments on commit c46701f

Please sign in to comment.