-
Notifications
You must be signed in to change notification settings - Fork 250
/
httpcache.go
160 lines (144 loc) · 4.06 KB
/
httpcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package backporter
import (
"bufio"
"bytes"
"context"
"fmt"
"net/http"
"net/http/httputil"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const (
xFromCache = "X-From-Cache"
cacheRefreshFreq = 10 * time.Minute
)
type bugzillaCache struct {
lock sync.Mutex
cache map[string][]byte
}
func (bc *bugzillaCache) get(key string) ([]byte, bool) {
bc.lock.Lock()
defer bc.lock.Unlock()
cachedVal, ok := bc.cache[key]
return cachedVal, ok
}
func (bc *bugzillaCache) set(key string, respBytes []byte) {
bc.lock.Lock()
defer bc.lock.Unlock()
bc.cache[key] = respBytes
}
var _ cache = &bugzillaCache{}
func newBugzillaCache() *bugzillaCache {
return &bugzillaCache{cache: map[string][]byte{}}
}
type cache interface {
get(string) ([]byte, bool)
set(string, []byte)
}
// cachingTransport is an implementation http.RoundTripper
// which first checks for cached values
type cachingTransport struct {
cache cache
transport http.RoundTripper
}
// RoundTrip will first check if there are any cached responses and return that
// if not it will make an HTTP call using the DefaultTransport
func (t *cachingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Method != "GET" {
return t.transport.RoundTrip(req)
}
var resp *http.Response
g := errgroup.Group{}
g.Go(func() error {
var err error
// Disable the bodyclose linter, in this particular case the caller is responsible
// for closing the body.
// nolint:bodyclose
resp, err = t.transport.RoundTrip(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
body, err := httputil.DumpResponse(resp, true)
if err != nil {
return fmt.Errorf("err while serializing response to cache: %w", err)
}
t.cache.set(req.URL.String(), body)
}
return nil
})
if cachedVal, isCached := t.cache.get(req.URL.String()); isCached {
b := bytes.NewBuffer(cachedVal)
cachedResp, err := http.ReadResponse(bufio.NewReader(b), req)
if err != nil {
return nil, err
}
cachedResp.Header.Set(xFromCache, "1")
return cachedResp, nil
}
if err := g.Wait(); err != nil {
return nil, err
}
return resp, nil
}
func refreshCache(bc *bugzillaCache, m prometheus.Gauge) {
var sem = semaphore.NewWeighted(int64(10))
ctx := context.Background()
logrus.WithField("cache_entries", len(bc.cache)).Info("Refreshing cache")
m.Set(float64(len(bc.cache)))
for url := range bc.cache {
if err := sem.Acquire(ctx, 1); err != nil {
logrus.WithError(fmt.Errorf("failed to acquire semaphore for key %s: %w", url, err))
}
url := url
go func() {
defer sem.Release(1)
resp, err := http.Get(url)
if err != nil {
logrus.WithError(fmt.Errorf("cache refresh error - failed to fetch %s: %w", url, err))
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
body, err := httputil.DumpResponse(resp, true)
if err != nil {
logrus.WithError(fmt.Errorf("cache refresh error - DumpResponse failed %s: %w", url, err))
return
}
bc.set(url, body)
}
}()
}
}
// NewCachingTransport is a constructor for cachingTransport
// If an entry is present in the cache, it is immediately returned
// while also generating an async HTTP call to the bugzilla server to get the latest value
// which is stored in the cache.
// Therefore this cache does *NOT* reduce the HTTP traffic, and is only used to speed up the response.
func NewCachingTransport() http.RoundTripper {
t := cachingTransport{
cache: newBugzillaCache(),
transport: http.DefaultTransport,
}
cacheRefreshMetrics := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "bugzilla_backporter_cached_bugs",
Help: "bugs in cache to be refreshed",
},
)
prometheus.MustRegister(cacheRefreshMetrics)
ticker := time.NewTicker(cacheRefreshFreq)
go func() {
defer ticker.Stop()
for {
<-ticker.C
refreshCache(t.cache.(*bugzillaCache), cacheRefreshMetrics)
}
}()
return &t
}