Skip to content

Commit

Permalink
addresses issue #41, among other things
Browse files Browse the repository at this point in the history
  • Loading branch information
boj committed Aug 6, 2013
1 parent 0d4e407 commit 7016ccc
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 175 deletions.
36 changes: 18 additions & 18 deletions client.go
Expand Up @@ -2,7 +2,6 @@ package riakpbc

import (
"log"
"time"
)

type Client struct {
Expand All @@ -14,6 +13,8 @@ type Client struct {
}

// NewClient accepts a slice of node address strings and returns a Client object.
//
// Illegally addressed nodes will be rejected in the NewPool call.
func NewClient(cluster []string) *Client {
return &Client{
cluster: cluster,
Expand All @@ -24,6 +25,8 @@ func NewClient(cluster []string) *Client {
}

// NewClientWihtCoder accepts a slice of node address strings, a Coder for processing structs into data, and returns a Client object.
//
// Illegally addressed nodes will be rejected in the NewPool call.
func NewClientWithCoder(cluster []string, coder *Coder) *Client {
return &Client{
cluster: cluster,
Expand All @@ -36,15 +39,14 @@ func NewClientWithCoder(cluster []string, coder *Coder) *Client {

// Dial connects all nodes in the pool to their addresses via TCP.
//
// Illegally addressed nodes will be rejected here.
// Nodes which are down get set to redial in the background.
func (c *Client) Dial() error {
for k, node := range c.pool.nodes {
for _, node := range c.pool.nodes {
err := node.Dial()
if err != nil {
if c.LoggingEnabled() {
log.Print("[POOL] Error: ", err)
}
c.pool.DeleteNode(k)
}
}

Expand All @@ -56,27 +58,17 @@ func (c *Client) Dial() error {
return ErrZeroNodes
}

go c.BackgroundNodePing()

return nil
}

func (c *Client) BackgroundNodePing() {
for {
time.Sleep(time.Duration(c.pingFrequency) * time.Millisecond)
c.pool.Ping()
}
}

// Close closes the node TCP connections.
func (c *Client) Close() {
c.pool.Close()
}

// SelectNode selects a node from the pool, see *Pool.SelectNode()
func (c *Client) SelectNode() *Node {
node := c.pool.SelectNode()
return node
func (c *Client) SelectNode() (*Node, error) {
return c.pool.SelectNode()
}

// Pool returns the pool associated with the client.
Expand Down Expand Up @@ -149,14 +141,22 @@ func (c *Client) DoStruct(opts interface{}, in interface{}) (interface{}, error)

// ReqResp is the top level interface for the client for a bulk of Riak operations
func (c *Client) ReqResp(reqstruct interface{}, structname string, raw bool) (response interface{}, err error) {
return c.SelectNode().ReqResp(reqstruct, structname, raw)
node, err := c.SelectNode()
if err != nil {
return nil, err
}
return node.ReqResp(reqstruct, structname, raw)
}

// ReqMultiResp is the top level interface for the client for the few
// operations which have to hit the server multiple times to guarantee
// a complete response: List keys, Map Reduce, etc.
func (c *Client) ReqMultiResp(reqstruct interface{}, structname string) (response interface{}, err error) {
return c.SelectNode().ReqMultiResp(reqstruct, structname)
node, err := c.SelectNode()
if err != nil {
return nil, err
}
return node.ReqMultiResp(reqstruct, structname)
}

func (c *Client) EnableLogging() {
Expand Down
53 changes: 0 additions & 53 deletions decaying.go

This file was deleted.

22 changes: 0 additions & 22 deletions decaying_test.go

This file was deleted.

1 change: 1 addition & 0 deletions err.go
Expand Up @@ -16,4 +16,5 @@ var (
ErrWriteTimeout = errors.New("write timeout")
ErrZeroNodes = errors.New("zero nodes in pool")
ErrNoContent = errors.New("no content")
ErrAllNodesDown = errors.New("all nodes down")
)
69 changes: 44 additions & 25 deletions node.go
Expand Up @@ -16,14 +16,14 @@ type Node struct {
conn *net.TCPConn
readTimeout time.Duration
writeTimeout time.Duration
errorRate *Decaying
retryTimeout time.Duration
ok bool
oklock *sync.Mutex
sync.Mutex
}

// Returns a new Node.
func NewNode(addr string, readTimeout, writeTimeout time.Duration) (*Node, error) {
func NewNode(addr string, readTimeout, writeTimeout, retryTimeout time.Duration) (*Node, error) {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -34,37 +34,27 @@ func NewNode(addr string, readTimeout, writeTimeout time.Duration) (*Node, error
tcpAddr: tcpaddr,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
errorRate: NewDecaying(),
ok: true,
retryTimeout: retryTimeout,
ok: false,
oklock: &sync.Mutex{},
}

return node, nil
}

// Dialaconnects to a single riak node.
// Dial connects to a single riak node. A Node is not OK until it has successfully dialed.
func (node *Node) Dial() (err error) {
node.conn, err = net.DialTCP("tcp", nil, node.tcpAddr)
if err != nil {
node.RecordError()
return err
}

node.SetOk(true)
node.conn.SetKeepAlive(true)

return nil
}

// ErrorRate safely returns the current Node's error rate
func (node *Node) ErrorRate() float64 {
return node.errorRate.Value()
}

// RecordErrror increments the current error value - see decaying.go
func (node *Node) RecordError(amount float64) {
node.SetOk(false)
node.errorRate.Add(amount)
}

func (node *Node) GetOk() bool {
var out bool
node.oklock.Lock()
Expand All @@ -79,6 +69,35 @@ func (node *Node) SetOk(ok bool) {
node.oklock.Unlock()
}

// RecordError sets the Node into a redial state. The Node reports itself as down until it has redialed.
func (node *Node) RecordError() {
if node.GetOk() {
node.SetOk(false)
go node.BackgroundRedial()
}
}

// BackgroundRedial continues to redial the Node in the background every retryTimeout, up to NODE_DOWN_MAX_RETRY.
func (node *Node) BackgroundRedial() {
node.Lock()
time.Sleep(node.retryTimeout)
node.Unlock()

if err := node.Dial(); err == nil {
node.Lock()
node.retryTimeout = NODE_DOWN_RETRY
node.Unlock()
return
}

node.Lock()
if node.retryTimeout < NODE_DOWN_MAX_RETRY {
node.retryTimeout += NODE_DOWN_RETRY_INCREMET
}
node.Unlock()
go node.BackgroundRedial()
}

func (node *Node) ReqResp(reqstruct interface{}, structname string, raw bool) (response interface{}, err error) {
node.Lock()
if raw == true {
Expand Down Expand Up @@ -178,7 +197,7 @@ func (node *Node) read() (respraw []byte, err error) {
// read rest of message
m, err := io.ReadFull(node.conn, data)
if err != nil {
node.RecordError(1.0)
node.RecordError()
return nil, err
}
if m == int(size) {
Expand All @@ -191,19 +210,19 @@ func (node *Node) read() (respraw []byte, err error) {
func (node *Node) response() (response interface{}, err error) {
rawresp, err := node.read()
if err != nil {
node.RecordError(1.0)
node.RecordError()
return nil, err
}

err = validateResponseHeader(rawresp)
if err != nil {
node.RecordError(1.0)
node.RecordError()
return nil, err
}

response, err = unmarshalResponse(rawresp)
if err != nil || response == nil {
node.RecordError(1.0)
node.RecordError()
return nil, err
}

Expand All @@ -213,13 +232,13 @@ func (node *Node) response() (response interface{}, err error) {
func (node *Node) request(reqstruct interface{}, structname string) (err error) {
marshaledRequest, err := proto.Marshal(reqstruct.(proto.Message))
if err != nil {
node.RecordError(1.0)
node.RecordError()
return err
}

err = node.rawRequest(marshaledRequest, structname)
if err != nil {
node.RecordError(1.0)
node.RecordError()
return err
}

Expand All @@ -229,13 +248,13 @@ func (node *Node) request(reqstruct interface{}, structname string) (err error)
func (node *Node) rawRequest(marshaledRequest []byte, structname string) (err error) {
formattedRequest, err := prependRequestHeader(structname, marshaledRequest)
if err != nil {
node.RecordError(1.0)
node.RecordError()
return err
}

err = node.write(formattedRequest)
if err != nil {
node.RecordError(1.0)
node.RecordError()
return err
}
return
Expand Down
1 change: 1 addition & 0 deletions object_test.go
Expand Up @@ -24,6 +24,7 @@ func setupConnection(t *testing.T) (client *Client) {
"127.0.0.1:8088",
"127.0.0.1:8089"},
coder)
//client.EnableLogging()
var err error
if err = client.Dial(); err != nil {
t.Error(err.Error())
Expand Down

0 comments on commit 7016ccc

Please sign in to comment.