-
Notifications
You must be signed in to change notification settings - Fork 0
/
roundrobin.go
113 lines (100 loc) · 2.99 KB
/
roundrobin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package lib
import (
"log"
"net/http"
"sync"
"time"
)
// RoundRobin struct contains:
// - A slice of node pointers
// - The current "active" node index
// - The maximum number of Healthy nodes
type RoundRobin struct {
Nodes []*Node
current int
max int
mux sync.Mutex
}
// NewRoundRobin creates a new RoundRobin load balancer.
func NewRoundRobin(nodes []*Node) *RoundRobin {
rr := RoundRobin{Nodes: nodes, current: 0, max: len(nodes)}
healthyNodesGauge.Set(float64(rr.max))
totalNodesGauge.Set(float64(rr.max))
rr.AsyncHealthChecks()
return &rr
}
// AsyncHealthChecks performs health checks in the background at an interval
// set by asyncHealthChecksTimeSeconds.
func (rr *RoundRobin) AsyncHealthChecks() {
go func() {
for {
log.Println("Performing async health checks")
healthyNodes := 0
for _, n := range rr.Nodes {
if n.CheckHealth() {
rr.idempotentRecoverNode(n)
healthyNodes++
} else {
rr.idempotentDeactivateNode(n)
}
}
log.Printf("%d out of %d nodes are healthy", healthyNodes, len(rr.Nodes))
healthyNodesGauge.Set(float64(healthyNodes))
time.Sleep(asyncHealthChecksTimeSeconds * time.Second)
}
}()
}
// Handler selects a node via round robin and passes the request to the
// selected node.
func (rr *RoundRobin) Handler(w http.ResponseWriter, r *http.Request) {
node := rr.selectNode()
log.Printf("Handling request to %s:%s. Active Connections: %d. Method: RoundRobin.\n", node.Address, node.Port, node.ActiveConnections)
switch status := node.Handler(w, r); {
case status >= 500:
log.Printf("Node %s:%s failed to process request. Status: %d.\n", node.Address, node.Port, status)
rr.idempotentDeactivateNode(node)
processedTotal.WithLabelValues("5xx", node.Address).Inc()
case status >= 400:
rr.idempotentRecoverNode(node)
processedTotal.WithLabelValues("4xx", node.Address).Inc()
case status >= 300:
rr.idempotentRecoverNode(node)
processedTotal.WithLabelValues("3xx", node.Address).Inc()
case status >= 200:
rr.idempotentRecoverNode(node)
processedTotal.WithLabelValues("2xx", node.Address).Inc()
default:
rr.idempotentRecoverNode(node)
processedTotal.WithLabelValues("1xx", node.Address).Inc()
}
}
func (rr *RoundRobin) selectNode() *Node {
rr.mux.Lock()
defer rr.mux.Unlock()
node := rr.Nodes[rr.current]
count := 0
// If there's no healthy nodes, just serve round robin. Otherwise, iterate
// until we get a healthy node.
//
// TODO: We should set an "inversion" value in config.yaml when, if the sum
// of healthy nodes is equal or below this value, we just serve to all nodes.
for node.IsUnhealthy() && count < rr.max {
rr.current = (rr.current + 1) % rr.max
node = rr.Nodes[rr.current]
count++
}
rr.current = (rr.current + 1) % rr.max
return node
}
func (rr *RoundRobin) idempotentRecoverNode(n *Node) {
if n.IsUnhealthy() {
n.SetHealthy()
healthyNodesGauge.Inc()
}
}
func (rr *RoundRobin) idempotentDeactivateNode(n *Node) {
if n.IsHealthy() {
n.SetUnhealthy()
healthyNodesGauge.Dec()
}
}