/
put.go
342 lines (288 loc) · 10.7 KB
/
put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package endpoints
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"github.com/julienschmidt/httprouter"
"github.com/prebid/prebid-cache/backends"
backendDecorators "github.com/prebid/prebid-cache/backends/decorators"
"github.com/prebid/prebid-cache/metrics"
"github.com/prebid/prebid-cache/utils"
"github.com/sirupsen/logrus"
)
// PutHandler serves "POST /cache" requests.
type PutHandler struct {
backend backends.Backend
cfg putHandlerConfig
memory syncPools
metrics *metrics.Metrics
}
type putHandlerConfig struct {
maxNumValues int
allowKeys bool
}
type syncPools struct {
requestPool sync.Pool
putResponsePool sync.Pool
}
// NewPutHandler returns the handle function for the "/cache" endpoint when it receives a POST request
func NewPutHandler(storage backends.Backend, metrics *metrics.Metrics, maxNumValues int, allowKeys bool) func(http.ResponseWriter, *http.Request, httprouter.Params) {
putHandler := &PutHandler{}
// Assign storage client to put endpoint
putHandler.backend = storage
// pass metrics engine
putHandler.metrics = metrics
// Pass configuration values
putHandler.cfg = putHandlerConfig{
maxNumValues: maxNumValues,
allowKeys: allowKeys,
}
// Instantiate thread-safe memory pools
putHandler.memory = syncPools{
requestPool: sync.Pool{
New: func() interface{} {
return &putRequest{}
},
},
putResponsePool: sync.Pool{
New: func() interface{} {
return &PutResponse{}
},
},
}
return putHandler.handle
}
// parseRequest unmarshals the incoming put request into a thread-safe memory pool. If
// the incoming request could not be unmarshalled or if the request comes with more
// elements to put than the maximum allowed in Prebid Cache's configuration, the
// corresponding error is returned
func (e *PutHandler) parseRequest(r *http.Request) (*putRequest, error) {
if r == nil {
return nil, utils.NewPBCError(utils.PUT_BAD_REQUEST)
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, utils.NewPBCError(utils.PUT_BAD_REQUEST)
}
defer r.Body.Close()
// Allocate a PutRequest object in thread-safe memory
put := e.memory.requestPool.Get().(*putRequest)
put.Puts = make([]putObject, 0)
if err := json.Unmarshal(body, put); err != nil {
// place memory back in sync pool
e.memory.requestPool.Put(put)
return nil, utils.NewPBCError(utils.PUT_BAD_REQUEST, string(body))
}
if len(put.Puts) > e.cfg.maxNumValues {
// place memory back in sync pool
e.memory.requestPool.Put(put)
return nil, utils.NewPBCError(utils.PUT_MAX_NUM_VALUES, fmt.Sprintf("More keys than allowed: %d", e.cfg.maxNumValues))
}
return put, nil
}
// parsePutObject returns an error if the putObject comes with an invalid field
// and formats the string according to its type:
// - XML content gets unmarshaled in order to un-escape it and then gets
// prepended by its type
// - JSON content gets prepended by its type
//
// No other formats are supported.
func parsePutObject(p putObject) (string, error) {
var toCache string
// Make sure there's data to store
if len(p.Value) == 0 {
return "", utils.NewPBCError(utils.MISSING_VALUE)
}
// Make sure a non-negative time-to-live quantity was provided
if p.TTLSeconds < 0 {
return "", utils.NewPBCError(utils.NEGATIVE_TTL, fmt.Sprintf("ttlseconds must not be negative %d.", p.TTLSeconds))
}
// Limit the type of data to XML or JSON
if p.Type == utils.XML_PREFIX {
// Be careful about the cross-script escaping issues here. JSON requires quotation marks to be escaped,
// for example... so we'll need to un-escape it before we consider it to be XML content.
interpreted, err := unescapeXML(p.Value)
if err != nil {
return "", err
}
toCache = p.Type + interpreted
} else if p.Type == utils.JSON_PREFIX {
toCache = p.Type + string(p.Value)
} else {
return "", utils.NewPBCError(utils.UNSUPPORTED_DATA_TO_STORE, fmt.Sprintf("Type must be one of [\"json\", \"xml\"]. Found '%s'", p.Type))
}
return toCache, nil
}
// unescapeXML unmarshalls the rawXML into a string in order to unescape characters
func unescapeXML(rawXML json.RawMessage) (string, error) {
if rawXML[0] != byte('"') || rawXML[len(rawXML)-1] != byte('"') {
return "", utils.NewPBCError(utils.MALFORMED_XML, fmt.Sprintf("XML messages must have a String value. Found %v", rawXML))
}
var interpreted string
if err := json.Unmarshal(rawXML, &interpreted); err != nil {
return "", utils.NewPBCError(utils.MALFORMED_XML, fmt.Sprintf("Error unmarshalling XML value: %v", rawXML))
}
return interpreted, nil
}
func classifyBackendError(err error, index int) error {
if _, ok := err.(*backendDecorators.BadPayloadSize); ok {
return utils.NewPBCError(utils.BAD_PAYLOAD_SIZE, fmt.Sprintf("POST /cache element %d exceeded max size: %v", index, err.Error()))
}
switch err {
case context.DeadlineExceeded:
return utils.NewPBCError(utils.PUT_DEADLINE_EXCEEDED)
default:
return utils.NewPBCError(utils.PUT_INTERNAL_SERVER, err.Error())
}
return nil
}
func logBackendError(err error) {
logrus.Error("POST /cache Error while writing to the back-end: ", err)
if pbcErr, isPBCErr := err.(utils.PBCError); isPBCErr && pbcErr.StatusCode == utils.PUT_DEADLINE_EXCEEDED {
logrus.Error("POST /cache timed out:", err)
} else {
logrus.Error("POST /cache had an unexpected error:", err)
}
}
// handle is the handler function that gets assigned to the POST method of the `/cache` endpoint
func (e *PutHandler) handle(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
e.metrics.RecordPutTotal()
start := time.Now()
bytes, err := e.processPutRequest(r)
if err != nil {
// At least one of the elements in the incoming request could not be stored
// write the http error and log corresponding metrics
var statusCode int
if pbcErr, isPBCErr := err.(utils.PBCError); isPBCErr {
statusCode = pbcErr.StatusCode
if statusCode >= 400 && statusCode < 500 {
e.metrics.RecordPutBadRequest()
} else {
e.metrics.RecordPutError()
}
} else {
// All errors returned by e.processPutRequest(r) should be utils.PBCErrors
// if not, consider it an interval server error with a http.StatusInternalServerError
// status code and accounted under RecordPutError()
statusCode = http.StatusInternalServerError
e.metrics.RecordPutError()
}
http.Error(w, err.Error(), statusCode)
return
}
// successfully stored all elements in storage service or database, write http
// response and record duration metrics
w.Header().Set("Content-Type", "application/json")
w.Write(bytes)
e.metrics.RecordPutDuration(time.Since(start))
}
// processPutRequest parses, unmarshals, and validates the incoming request; then calls the back-end Put()
// implementation on every element of the "puts" array. This function exits after all elements in the
// "puts" array have been stored in the back-end, or after the first error is found
func (e *PutHandler) processPutRequest(r *http.Request) ([]byte, error) {
// Parse and validate incoming request
putRequest, err := e.parseRequest(r)
if err != nil {
return nil, err
}
defer e.memory.requestPool.Put(putRequest)
// Allocate a PutResponse object in thread-safe memory
putResponse := e.memory.putResponsePool.Get().(*PutResponse)
putResponse.Responses = make([]putResponseObject, len(putRequest.Puts))
defer e.memory.putResponsePool.Put(putResponse)
// Send elements to storage service or database
if pcErr := e.putElements(putRequest, putResponse); pcErr != nil {
return nil, pcErr
}
// Marshal Prebid Cache's response
bytes, err := json.Marshal(putResponse)
if err != nil {
return nil, utils.NewPBCError(utils.MARSHAL_RESPONSE)
}
return bytes, nil
}
// putElements calls put(po *putObject, wg *sync.WaitGroup) in parallel and if any of those calls generates an error, logs the
// first one in the order its corresponding putObject came inside the []PutRequest.Puts array
//
// TODO: For those storage clients that support storing multiple elements in a single call, build a batch and send them together
// TODO: Allow Prebid Cache to provide error details in an "errors" field in the response
func (e *PutHandler) putElements(put *putRequest, resps *PutResponse) error {
// Call Put() implementation of storage back-end in parrallel
var waitGroup sync.WaitGroup
waitGroup.Add(len(put.Puts))
for i := 0; i < len(put.Puts); i++ {
go e.put(&put.Puts[i], &resps.Responses[i], i, &waitGroup)
}
waitGroup.Wait()
// Log the first element found and return it
for _, resp := range resps.Responses {
if resp.err != nil {
logBackendError(resp.err)
return resp.err
}
}
return nil
}
// put parses the putObject, validates it and calls the back-end storage Put() function this Prebid Cache instance
// is using. Returns a putResponseObject storing either the corresponding UUID's data was stored under, or an error
// if any.
func (e *PutHandler) put(po *putObject, resp *putResponseObject, index int, wg *sync.WaitGroup) {
defer wg.Done()
toCache, err := parsePutObject(*po)
if err != nil {
resp.err = err
return
}
// Only allow setting a provided key if configured (and ensure a key is provided).
if e.cfg.allowKeys && len(po.Key) > 0 {
// put object comes with custom key, which we are allowed to use
resp.UUID = po.Key
e.metrics.RecordPutKeyProvided()
} else {
// Either put object doesn't come with a custom key or Prebid Cache is configured
// to not use custom keys. Generate a random UUID
if resp.UUID, err = utils.GenerateRandomID(); err != nil {
resp.UUID = ""
resp.err = utils.NewPBCError(utils.PUT_INTERNAL_SERVER, "Error generating version 4 UUID")
return
}
}
// If we have a blank UUID, don't store anything.
// Eventually we may want to provide error details, but as of today this is the only non-fatal error
// Future error details could go into a second property of the Responses object, such as "errors"
if len(resp.UUID) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err = e.backend.Put(ctx, resp.UUID, toCache, po.TTLSeconds)
if err != nil {
if pbcErr, isPbcErr := err.(utils.PBCError); isPbcErr && pbcErr.Type == utils.RECORD_EXISTS {
// Record didn't get overwritten, return a response with an empty UUID string
resp.UUID = ""
} else {
resp.err = classifyBackendError(err, index)
}
}
}
return
}
type putRequest struct {
Puts []putObject `json:"puts"`
}
type putObject struct {
Type string `json:"type"`
TTLSeconds int `json:"ttlseconds"`
Value json.RawMessage `json:"value"`
Key string `json:"key"`
}
type putResponseObject struct {
UUID string `json:"uuid"`
err error
}
// PutResponse will be marshaled to be written into the http response
type PutResponse struct {
Responses []putResponseObject `json:"responses"`
}