/
main.go
165 lines (141 loc) · 4.26 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* Proxy
* @summary naive http proxy implementation that limits in-flight requests to origin server.
* @param {ParamDataTypeHere} parameterNameHere - Brief description of the parameter here. Note: For other notations of data types, please refer to JSDocs: DataTypes command.
* @return {SetMutexProfileFraction} Brief description of the returning value here.
*/
package main
import (
"flag"
"fmt"
"log"
"math"
"net/http"
"net/http/httputil"
_ "net/http/pprof"
"net/url"
"runtime"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/time/rate"
)
func main() {
originAddr := flag.String("origin", "http://localhost:8000", "origin address where to proxy requests")
addr := flag.String("addr", ":7000", "address to listen to")
quota := flag.Int64("quota", 5, "allowed number of concurrent requests")
adaptive := flag.Bool("adaptive", false, "adaptive capacity control")
flag.Parse()
runtime.SetMutexProfileFraction(5)
inflightRequests := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "proxy_inflight_requests",
Help: "How many HTTP requests are in-flight.",
})
targetInflightRequests := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "proxy_target_inflight_requests",
Help: "How many HTTP requests should be in-flight.",
})
prometheus.MustRegister(inflightRequests)
prometheus.MustRegister(targetInflightRequests)
http.Handle("/metrics", promhttp.Handler())
inflight := NewQuota(*quota, inflightRequests, targetInflightRequests)
// incLimiter throttles additive increase which happens on every HTTP 200 OK response.
incLimiter := rate.NewLimiter(rate.Limit(1), 1)
target, err := url.Parse(*originAddr)
if err != nil {
log.Fatalf("💥 Proxy: failed to parse origin url: %v", err)
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.ModifyResponse = func(resp *http.Response) error {
if !*adaptive {
return nil
}
if resp.StatusCode != http.StatusOK {
inflight.Backoff(0.75)
return nil
}
/**
* @constant {incLimiter}
* @summary Increase target concurrency by a constant c per unit time,
* @example allow 1 more rps every second if there is a demand.
*/
if incLimiter.Allow() {
inflight.Inc()
}
return nil
}
proxy.ErrorHandler = func(rw http.ResponseWriter, r *http.Request, err error) {
log.Printf("💥 Proxy: %v", err)
rw.WriteHeader(http.StatusBadGateway)
if *adaptive {
// @const inflight.Backoff 0.75
inflight.Backoff(0.75)
}
}
http.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) {
if inflight.Receive() {
proxy.ServeHTTP(rw, r)
inflight.Release()
return
}
rw.WriteHeader(http.StatusTooManyRequests)
fmt.Fprint(rw, "▶\n")
})
http.ListenAndServe(*addr, nil)
}
/**
* {Quota} - Note: used for usual data type declaration.
* {Quota.<quanity, int64>} - imited quantity of requests allowed to be in-flight.
* @summary Quota is a limited quantity of requests allowed to be in-flight.
*/
type Quota struct {
used int64
max int64
current prometheus.Gauge
target prometheus.Gauge
}
// NewQuota creates a quota of in-flight requests.
func NewQuota(n int64, current, target prometheus.Gauge) *Quota {
q := Quota{
max: n,
current: current,
target: target,
}
return &q
}
// Receive fills quota by one and returns true if quota is available.
func (q *Quota) Receive() bool {
used := atomic.LoadInt64(&q.used)
max := atomic.LoadInt64(&q.max)
available := used < max
// If quota became available here, it's still OK to reject the request.
if !available {
return false
}
atomic.AddInt64(&q.used, 1)
q.current.Inc()
// If quota became unavailable here, it's still OK to process the request.
return true
}
// Release frees up quota by one.
func (q *Quota) Release() {
atomic.AddInt64(&q.used, -1)
q.current.Dec()
}
// Inc lifts quota by one.
func (q *Quota) Inc() {
atomic.AddInt64(&q.max, 1)
q.target.Inc()
}
// Backoff sets target concurrency to a fraction p of its current size (0 <= p <= 1), e.g.,
// back-off to 75% when a service is overloaded.
func (q *Quota) Backoff(p float64) {
for {
oldMax := atomic.LoadInt64(&q.max)
newMax := math.Ceil(p * float64(oldMax))
if atomic.CompareAndSwapInt64(&q.max, oldMax, int64(newMax)) {
q.target.Set(newMax)
break
}
}
}