Skip to content

Commit

Permalink
Reconnecting works.
Browse files Browse the repository at this point in the history
  • Loading branch information
mrb committed Jun 19, 2013
1 parent b999897 commit a34a81d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 21 deletions.
39 changes: 22 additions & 17 deletions conn.go
Expand Up @@ -72,9 +72,7 @@ func (c *Conn) Close() {
}

func (c *Conn) SelectNode() *Node {
c.pool.Lock()
node := c.pool.SelectNode()
c.pool.Unlock()
return node
}

Expand All @@ -83,6 +81,7 @@ func (c *Conn) Pool() *Pool {
}

func (pool *Pool) SelectNode() *Node {
pool.Lock()
errorThreshold := 0.1
var possibleNodes []*Node

Expand All @@ -96,13 +95,26 @@ func (pool *Pool) SelectNode() *Node {

numPossibleNodes := len(possibleNodes)

var chosenNode *Node
if numPossibleNodes > 0 {
pool.current = possibleNodes[rand.Int31n(int32(numPossibleNodes))]
chosenNode = possibleNodes[rand.Int31n(int32(numPossibleNodes))]
} else {
pool.current = pool.RandomNode()
chosenNode = pool.RandomNode()
}
current := pool.current
return current

resp, err := chosenNode.ReqResp([]byte{}, "RpbPingReq", true)
if resp == nil || string(resp.([]byte)) != "Pong" || err != nil {
chosenNode.RecordError(1.0)
chosenNode.Dial()
//pool.DeleteNode(chosenNode.addr)
chosenNode = pool.RandomNode()
}

pool.current = chosenNode

pool.Unlock()

return chosenNode
}

func (pool *Pool) RandomNode() *Node {
Expand All @@ -125,14 +137,6 @@ func (pool *Pool) RandomNode() *Node {

func (pool *Pool) DeleteNode(nodeKey string) {
delete(pool.nodes, nodeKey)

var nodeStrings []string

for k, _ := range pool.nodes {
nodeStrings = append(nodeStrings, k)
}

return
}

func (pool *Pool) Close() {
Expand Down Expand Up @@ -182,13 +186,14 @@ func newPool(cluster []string) *Pool {

log.Print("[POOL] New connection Pool established. Attempting connection to ", len(pool.nodes), " Riak nodes.")

pool.SelectNode()

return pool
}

func (conn *Conn) ReqResp(reqstruct interface{}, structname string, raw bool) (response interface{}, err error) {
node := conn.SelectNode()
return conn.SelectNode().ReqResp(reqstruct, structname, raw)
}

func (node *Node) ReqResp(reqstruct interface{}, structname string, raw bool) (response interface{}, err error) {
node.Lock()
if raw == true {
err = node.RawRequest(reqstruct.([]byte), structname)
Expand Down
2 changes: 1 addition & 1 deletion decaying.go
Expand Up @@ -16,7 +16,7 @@ func NewDecaying() *Decaying {
return &Decaying{
p: 0.0,
e: math.E,
r: math.Log(0.5) / 10,
r: math.Log(0.5) / 10 * 1e2,
t0: time.Now(),
}
}
Expand Down
8 changes: 5 additions & 3 deletions example/example.go
Expand Up @@ -28,7 +28,7 @@ func main() {

for g := 0; g < 4; g++ {
go func(which int) {
log.Print(which)
log.Print("<", which, "> Loaded")
var times int
for {
actionBegin := time.Now()
Expand All @@ -37,7 +37,10 @@ func main() {
riak.StoreObject("bucket", "data", "{'ok':'ok'}")
riak.SetClientId("coolio")
riak.GetClientId()
data, _ := riak.FetchObject("bucket", "data")
data, err := riak.FetchObject("bucket", "data")
if err != nil {
break
}
if string(data.GetContent()[0].GetValue()) != "{'ok':'ok'}" {
log.Fatal("FUCK")
}
Expand All @@ -49,7 +52,6 @@ func main() {
}
}(g)
}
log.Print("DONE")
<-c
actionEnd = time.Now()
actionDuration := actionEnd.Sub(actionBegin)
Expand Down

0 comments on commit a34a81d

Please sign in to comment.