-
Notifications
You must be signed in to change notification settings - Fork 1
/
backend.go
139 lines (118 loc) · 3.67 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package pkg
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"net"
"net/http"
"net/url"
"sync"
"time"
)
const (
RetryID int = 1
RetryAttempts int = 10
RetryDelayInMS time.Duration = 50
)
// Backend is unique for a given host:port. This might be pointing to a single machine or possibly a LB/cluster.
// The Backend has a collection of BackendConnections. These BackendConnections are the REAL connections to the given
// target machine
type Backend struct {
Host string
Port int
BackendConnections []*BackendConnection
MaxConnections int
mux sync.RWMutex
// Is this backend alive/dead
Alive bool
aliveMux sync.RWMutex
}
func NewBackend(host string, port int, maxConnections int) *Backend {
be := Backend{}
be.Host = host
be.Port = port
be.Alive = true
be.MaxConnections = maxConnections
return &be
}
// LogStats... just a hack to get some data. Log stats (used connections etc).
func (ber *Backend) LogStats() error {
becInUse := 0
for _, bec := range ber.BackendConnections {
if bec.IsInUse() {
becInUse++
}
}
log.Infof("Backend %s : currently in use %d", ber.Host, becInUse)
return nil
}
// GetAttemptsFromContext returns the attempts for request
func GetRetryFromContext(r *http.Request) int {
if retry, ok := r.Context().Value(RetryID).(int); ok {
return retry
}
return 0
}
// GetBackendConnection either retrieves BackendConnection from a pool OR adds new entry to pool (or errors out)
func (ber *Backend) GetBackendConnection() (*BackendConnection, error) {
// TODO(kpfaulkner) benchmark this!
ber.mux.Lock()
defer ber.mux.Unlock()
// check if we have any backends spare. If so, use it.
for index, be := range ber.BackendConnections {
if !be.IsInUse() {
ber.BackendConnections[index].SetInUse(true)
return be, nil
}
}
// if none spare but haven't hit maxBackends yet, make one
if len(ber.BackendConnections) < ber.MaxConnections {
//log.Infof("backend url %s", ber.Host)
bec := NewBackendConnection(ber.Host)
bec.SetInUse(true)
bec.ReverseProxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
retries := GetRetryFromContext(request)
if retries < RetryAttempts {
log.Errorf("Failed query, delaying and retrying: %d : %s", retries, e.Error()) // TODO(kpfaulkner) add retry logic here.
<-time.After(RetryDelayInMS * time.Millisecond)
ctx := context.WithValue(request.Context(), RetryID, retries+1)
bec.ReverseProxy.ServeHTTP(writer, request.WithContext(ctx))
return
}
ber.SetIsAlive(false)
log.Errorf("Backend <find ID> returned error. Pausing... %s", e.Error()) // TODO(kpfaulkner) add retry logic here.
writer.WriteHeader(http.StatusTooManyRequests)
}
ber.BackendConnections = append(ber.BackendConnections, bec)
return bec, nil
}
// if cant make any more, return error.
return nil, fmt.Errorf("unable to provide backendconnection for request")
}
// CheckHealth confirms if can talk to host configured for this backend. If cannot, then mark backend as NOT alive.
// Unsure if should do TCP or HTTP. TCP would have less overhead and really just interested if we can connect... surely?
func (b *Backend) checkHealth() error {
timeout := 3 * time.Second
u, _ := url.Parse(b.Host)
conn, err := net.DialTimeout("tcp", u.Host, timeout)
b.SetIsAlive(err == nil)
if err != nil {
log.Infof("healthcheck for %s is %v", b.Host, err == nil)
}
if conn != nil {
conn.Close()
}
return nil
}
func (b *Backend) IsAlive() bool {
var alive bool
b.aliveMux.RLock()
alive = b.Alive
b.aliveMux.RUnlock()
return alive
}
func (b *Backend) SetIsAlive(alive bool) {
b.aliveMux.Lock()
b.Alive = alive
b.aliveMux.Unlock()
}