-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
142 lines (120 loc) · 3.7 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package lib
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// Node ...
type Node struct {
Address string `mapstructure:"address"`
Port string `mapstructure:"port"`
HealthURL string `mapstructure:"health"`
healthy bool
ActiveConnections int
mux sync.Mutex
}
var (
nodeActiveConnections = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "pblb_node_active_connections",
Help: "The total number of node active connections",
},
[]string{"node"},
)
)
// Init readies the node for requests.
func (n *Node) Init() {
n.ActiveConnections = 0
n.SetHealthy()
}
// SetHealthy sets the node to "healthy", meaning it is perceived to be able
// to successfully process requests.
func (n *Node) SetHealthy() {
n.healthy = true
}
// SetUnhealthy sets the node to "unhealthy", meaning it is not perceived to be
// able to successfully process requests.
func (n *Node) SetUnhealthy() {
n.healthy = false
}
// IsHealthy returns true if healthy.
func (n *Node) IsHealthy() bool {
return n.healthy
}
// IsUnhealthy returns true if unhealthy.
func (n *Node) IsUnhealthy() bool {
return !n.IsHealthy()
}
// CheckHealth manually performs a health check at the node's health url. It
// returns true if healthy, false if unhealthy. Note: This does not set the
// node's `healthy:bool` field, as that's the purview of the orchestrating
// load-balancer.
func (n *Node) CheckHealth() bool {
url := fmt.Sprintf("http://%s:%s%s", n.Address, n.Port, n.HealthURL)
resp, err := http.Get(url)
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
func (n *Node) incActiveConnections() {
n.mux.Lock()
n.ActiveConnections++
nodeActiveConnections.WithLabelValues(n.Address).Set(float64(n.ActiveConnections))
n.mux.Unlock()
}
func (n *Node) decActiveConnections() {
n.mux.Lock()
n.ActiveConnections--
nodeActiveConnections.WithLabelValues(n.Address).Set(float64(n.ActiveConnections))
n.mux.Unlock()
}
// Handler forwards request to node. Based on https://stackoverflow.com/a/34725635
func (n *Node) Handler(w http.ResponseWriter, req *http.Request) int {
n.incActiveConnections()
defer n.decActiveConnections()
// we need to buffer the body if we want to read it here and send it
// in the request.
body, err := ioutil.ReadAll(req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Printf("%d %s", http.StatusInternalServerError, err.Error())
return http.StatusInternalServerError
}
// you can reassign the body if you need to parse it as multipart
req.Body = ioutil.NopCloser(bytes.NewReader(body))
// create a new url from the raw RequestURI sent by the client
url := fmt.Sprintf("http://%s:%s/%s", n.Address, n.Port, req.RequestURI)
proxyReq, _ := http.NewRequest(req.Method, url, bytes.NewReader(body))
// We may want to filter some headers, otherwise we could just use a shallow
// copy proxyReq.Header = req.Header
proxyReq.Header = make(http.Header)
for h, val := range req.Header {
proxyReq.Header[h] = val
}
client := &http.Client{}
resp, err := client.Do(proxyReq)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
log.Printf("%d %s", http.StatusBadGateway, err.Error())
return http.StatusBadGateway
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
log.Printf("%d %s", http.StatusBadGateway, err.Error())
return http.StatusBadGateway
}
if resp.StatusCode != http.StatusOK {
w.WriteHeader(resp.StatusCode)
}
w.Write(b)
return resp.StatusCode
}