/
eskipbytes.go
180 lines (153 loc) · 4.51 KB
/
eskipbytes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package routesrv
import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
ot "github.com/opentracing/opentracing-go"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/tracing"
)
type responseWriterInterceptor struct {
http.ResponseWriter
statusCode int
bytesWritten int
}
func (w *responseWriterInterceptor) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
func (w *responseWriterInterceptor) Header() http.Header {
return w.ResponseWriter.Header()
}
func (w *responseWriterInterceptor) Write(p []byte) (int, error) {
w.bytesWritten += len(p)
return w.ResponseWriter.Write(p)
}
// Unwrap will be used by ResponseController, so if they will use that
// to get the ResponseWrite for some reason they can do it.
func (w *responseWriterInterceptor) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}
var (
_ http.ResponseWriter = &responseWriterInterceptor{}
)
// eskipBytes keeps eskip-formatted routes as a byte slice and
// provides synchronized r/w access to them. Additionally it can
// serve as an HTTP handler exposing its content.
type eskipBytes struct {
mu sync.RWMutex
data []byte
hash string
lastModified time.Time
initialized bool
count int
zw *gzip.Writer
zdata []byte
tracer ot.Tracer
metrics metrics.Metrics
now func() time.Time
}
// formatAndSet takes a slice of routes and stores them eskip-formatted
// in a synchronized way. It returns the length of the stored data, and
// flags signaling whether the data was initialized and updated.
func (e *eskipBytes) formatAndSet(routes []*eskip.Route) (_ int, _ string, initialized bool, updated bool) {
buf := &bytes.Buffer{}
eskip.Fprint(buf, eskip.PrettyPrintInfo{Pretty: false, IndentStr: ""}, routes...)
data := buf.Bytes()
e.mu.Lock()
defer e.mu.Unlock()
updated = !bytes.Equal(e.data, data)
if updated {
e.lastModified = e.now()
e.data = data
e.zdata = e.compressLocked(data)
e.hash = fmt.Sprintf("%x", sha256.Sum256(e.data))
e.count = len(routes)
}
initialized = !e.initialized
e.initialized = true
return len(e.data), e.hash, initialized, updated
}
// compressLocked compresses the data with gzip and returns
// the compressed data or nil if compression fails.
// e.mu must be held.
func (e *eskipBytes) compressLocked(data []byte) []byte {
var buf bytes.Buffer
if e.zw == nil {
e.zw = gzip.NewWriter(&buf)
} else {
e.zw.Reset(&buf)
}
if _, err := e.zw.Write(data); err != nil {
return nil
}
if err := e.zw.Close(); err != nil {
return nil
}
return buf.Bytes()
}
func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer)
defer span.Finish()
start := time.Now()
defer e.metrics.MeasureBackend("routersv", start)
w := &responseWriterInterceptor{
ResponseWriter: rw,
statusCode: http.StatusOK,
}
defer func() {
span.SetTag("status", w.statusCode)
span.SetTag("bytes", w.bytesWritten)
e.metrics.IncCounter(strconv.Itoa(w.statusCode))
}()
if r.Method != "GET" && r.Method != "HEAD" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
e.mu.RLock()
count := e.count
data := e.data
zdata := e.zdata
hash := e.hash
lastModified := e.lastModified
initialized := e.initialized
e.mu.RUnlock()
if initialized {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set(routing.RoutesCountName, strconv.Itoa(count))
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && len(zdata) > 0 {
w.Header().Set("Etag", `"`+hash+`+gzip"`)
w.Header().Set("Content-Encoding", "gzip")
http.ServeContent(w, r, "", lastModified, bytes.NewReader(zdata))
} else {
w.Header().Set("Etag", `"`+hash+`"`)
http.ServeContent(w, r, "", lastModified, bytes.NewReader(data))
}
} else {
w.WriteHeader(http.StatusNotFound)
}
}
// eskipBytesStatus serves as an HTTP health check for the referenced eskipBytes.
// Reports healthy only when the bytes were initialized (set at least once).
type eskipBytesStatus struct {
b *eskipBytes
}
const msgRoutesNotInitialized = "routes were not initialized yet"
func (s *eskipBytesStatus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.b.mu.RLock()
initialized := s.b.initialized
s.b.mu.RUnlock()
if initialized {
w.WriteHeader(http.StatusNoContent)
} else {
http.Error(w, msgRoutesNotInitialized, http.StatusServiceUnavailable)
}
}