This repository has been archived by the owner on Feb 15, 2020. It is now read-only.
/
s3splitfile_common.go
523 lines (456 loc) · 14.5 KB
/
s3splitfile_common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
# ***** END LICENSE BLOCK *****/
package s3splitfile
import (
"encoding/json"
"fmt"
"github.com/AdRoll/goamz/s3"
"github.com/mozilla-services/heka/message"
. "github.com/mozilla-services/heka/pipeline"
"io"
"io/ioutil"
"math"
"regexp"
"sort"
"strings"
)
type PublishAttempt struct {
Name string
AttemptsRemaining uint32
}
// Encapsulates the directory-splitting schema
type Schema struct {
Fields []string
FieldIndices map[string]int
Dims map[string]DimensionChecker
}
// Determine whether a given value is acceptable for a given field, and if not
// return a default value instead.
func (s *Schema) GetValue(field string, value string) (rvalue string, err error) {
checker, ok := s.Dims[field]
if !ok {
return value, fmt.Errorf("No such field: '%s'", field)
}
if checker.IsAllowed(value) {
return value, nil
} else {
return "OTHER", nil
}
}
// Extract all dimensions from the given pack.
func (s *Schema) GetDimensions(pack *PipelinePack) (dimensions []string) {
dims := make([]string, len(s.Fields))
for i, _ := range dims {
dims[i] = "UNKNOWN"
}
// TODO: add support for top-level message fields (Timestamp, etc)
remaining := len(dims)
for _, field := range pack.Message.Fields {
if remaining == 0 {
break
}
idx, ok := s.FieldIndices[field.GetName()]
if ok {
remaining -= 1
// We use the first available value, even if several have been
// provided.
inValue := field.GetValue()
if inValue != nil {
v, err := s.GetValue(field.GetName(), fmt.Sprintf("%v", inValue))
if err != nil {
fmt.Printf("How did this happen? %s", err)
}
if v != "" {
dims[idx] = v
} // Else the value was an empty string, leave as unknown.
} // Else there were no values, leave this field as unknown.
}
}
return dims
}
// Interface for calculating whether a particular value is acceptable
// as-is, or if it should be replaced with a default value.
type DimensionChecker interface {
IsAllowed(v string) bool
ListValues() ([]string, bool)
}
// Accept any value at all.
type AnyDimensionChecker struct {
}
func (adc AnyDimensionChecker) IsAllowed(v string) bool {
return true
}
func (adc AnyDimensionChecker) ListValues() ([]string, bool) {
return nil, false
}
// Accept a specific list of values, anything not in the list
// will not be accepted
type ListDimensionChecker struct {
// Use a map instead of a list internally for fast lookups.
allowed map[string]struct{}
}
func (ldc ListDimensionChecker) IsAllowed(v string) bool {
_, ok := ldc.allowed[SanitizeDimension(v)]
return ok
}
// Return the list of allowed values, sorted alphabetically.
func (ldc ListDimensionChecker) ListValues() ([]string, bool) {
var keys []string
for k := range ldc.allowed {
keys = append(keys, k)
}
sort.Strings(keys)
return keys, true
}
// Factory for creating a ListDimensionChecker using a list instead of a map
func NewListDimensionChecker(allowed []string) *ListDimensionChecker {
dimMap := map[string]struct{}{}
for _, a := range allowed {
dimMap[SanitizeDimension(a)] = struct{}{}
}
return &ListDimensionChecker{dimMap}
}
// If both are specified, accept any value between `min` and `max` (inclusive).
// If one of the bounds is missing, only enforce the other. If neither bound is
// present, accept all values.
type RangeDimensionChecker struct {
min string
max string
}
func (rdc RangeDimensionChecker) IsAllowed(v string) bool {
// Min and max are optional, so treat them separately.
// TODO: ensure that Go does string comparisons in the fashion expected
// by this code.
if rdc.min != "" && rdc.min > v {
return false
}
if rdc.max != "" && rdc.max < v {
return false
}
return true
}
func (rdc RangeDimensionChecker) ListValues() ([]string, bool) {
return nil, false
}
// Pattern to use for sanitizing path/file components.
var sanitizePattern = regexp.MustCompile("[^a-zA-Z0-9_/.]")
// Given a string, return a sanitized version that can be used safely as part
// of a filename (for example).
func SanitizeDimension(dim string) (cleaned string) {
return sanitizePattern.ReplaceAllString(dim, "_")
}
// Load a schema from the given file name. The file is expected to contain
// valid JSON describing a hierarchy of dimensions, each of which specifies
// what values are "allowed" for that dimension.
// Example schema:
// {
// "version": 1,
// "dimensions": [
// { "field_name": "submissionDate", "allowed_values": {
// { "min": "20140120", "max": "20140125" }
// },
// { "field_name": "sourceName", "allowed_values": "*" },
// { "field_name": "sourceVersion", "allowed_values": "*" },
// { "field_name": "reason", "allowed_values":
// [ "idle-daily","saved-session" ]
// },
// { "field_name": "appName", "allowed_values":
// [ "Firefox", "Fennec", "Thunderbird", "FirefoxOS", "B2G" ]
// },
// { "field_name": "appUpdateChannel",
// "allowed_values":
// [ "default", "nightly", "aurora", "beta", "release", "esr" ]
// },
// { "field_name": "appVersion", "allowed_values": "*" }
// ]
// }
func LoadSchema(schemaFileName string) (schema Schema, err error) {
// Placeholder for parsing JSON
type JSchemaDimension struct {
Field_name string
Allowed_values interface{}
}
// Placeholder for parsing JSON
type JSchema struct {
Version int32
Dimensions []JSchemaDimension
}
schemaBytes, err := ioutil.ReadFile(schemaFileName)
if err != nil {
return
}
var js JSchema
err = json.Unmarshal(schemaBytes, &js)
if err != nil {
return
}
fields := make([]string, len(js.Dimensions))
fieldIndices := map[string]int{}
dims := map[string]DimensionChecker{}
schema = Schema{fields, fieldIndices, dims}
for i, d := range js.Dimensions {
schema.Fields[i] = d.Field_name
schema.FieldIndices[d.Field_name] = i
switch d.Allowed_values.(type) {
case string:
if d.Allowed_values.(string) == "*" {
schema.Dims[d.Field_name] = AnyDimensionChecker{}
} else {
schema.Dims[d.Field_name] = NewListDimensionChecker([]string{d.Allowed_values.(string)})
}
case []interface{}:
allowed := make([]string, len(d.Allowed_values.([]interface{})))
for i, v := range d.Allowed_values.([]interface{}) {
allowedValue, ok := v.(string)
if !ok {
return schema, fmt.Errorf("Entries in 'allowed_values' for field '%s' must be strings", d.Field_name)
}
allowed[i] = allowedValue
}
schema.Dims[d.Field_name] = NewListDimensionChecker(allowed)
case map[string]interface{}:
vrange := d.Allowed_values.(map[string]interface{})
vMin, okMin := vrange["min"]
vMax, okMax := vrange["max"]
if !okMin && !okMax {
return schema, fmt.Errorf("Range for field '%s' must have at least one of 'min' or 'max'", d.Field_name)
}
ok := false
minStr := ""
if okMin {
minStr, ok = vMin.(string)
if !ok {
return schema, fmt.Errorf("Value of 'min' for field '%s' must be a string", d.Field_name)
}
}
maxStr := ""
if okMax {
maxStr, ok = vMax.(string)
if !ok {
return schema, fmt.Errorf("Value of 'max' for field '%s' must be a string (it was %+v)", d.Field_name, vMax)
}
}
schema.Dims[d.Field_name] = RangeDimensionChecker{minStr, maxStr}
}
}
return
}
var suffixes = [...]string{"", "K", "M", "G", "T", "P"}
// Return a nice, human-readable representation of the given number of bytes.
func PrettySize(bytes int64) string {
fBytes := float64(bytes)
sIdx := 0
for i, _ := range suffixes {
sIdx = i
if fBytes < math.Pow(1024.0, float64(sIdx+1)) {
break
}
}
pretty := fBytes / math.Pow(1024.0, float64(sIdx))
return fmt.Sprintf("%.2f%sB", pretty, suffixes[sIdx])
}
// Maximum number of S3 List results to fetch at once.
const listBatchSize = 1000
// Maximum number of S3 Record results to queue at once.
const fileBatchSize = 300
// Encapsulates the result of a List operation, allowing detection of errors
// along the way.
type S3ListResult struct {
Key s3.Key
Err error
}
// List the contents of the given bucket, sending matching filenames to a
// channel which can be read by the caller.
func S3Iterator(bucket *s3.Bucket, prefix string, schema Schema) <-chan S3ListResult {
keyChannel := make(chan S3ListResult, listBatchSize)
go FilterS3(bucket, prefix, 0, schema, keyChannel)
return keyChannel
}
// Recursively descend into an S3 directory tree, filtering based on the given
// schema, and sending results on the given channel. The `level` parameter
// indicates how far down the tree we are, and is used to determine which schema
// field we use for filtering.
func FilterS3(bucket *s3.Bucket, prefix string, level int, schema Schema, kc chan S3ListResult) {
// Update the marker as we encounter keys / prefixes. If a response is
// truncated, the next `List` request will start from the next item after
// the marker.
marker := ""
// Keep listing if the response is incomplete (there are more than
// `listBatchSize` entries or prefixes)
done := false
for !done {
response, err := bucket.List(prefix, "/", marker, listBatchSize)
if err != nil {
fmt.Printf("Error listing: %s\n", err)
// TODO: retry?
kc <- S3ListResult{s3.Key{}, err}
break
}
if !response.IsTruncated {
// Response is not truncated, so we're done.
done = true
}
if level >= len(schema.Fields) {
// We are past all the dimensions - encountered items are now
// S3 key names. We ignore any further prefixes and assume that the
// specified schema is correct/complete.
for _, k := range response.Contents {
marker = k.Key
kc <- S3ListResult{k, nil}
}
} else {
// We are still looking at prefixes. Recursively list each one that
// matches the specified schema's allowed values.
field := schema.Dims[schema.Fields[level]]
if values, ok := field.ListValues(); ok {
// If we have a list of allowed values, check each one directly
// instead of listing the entire bucket. This is MUCH faster in the
// case of high-cardinality dimensions (more than 1000 unique values
// for the dimension).
for _, v := range values {
newPrefix := fmt.Sprintf("%s%s/", prefix, v)
marker = newPrefix
FilterS3(bucket, newPrefix, level+1, schema, kc)
}
done = true
} else {
// We have a Range or All type dimension, so list all values and
// check if each one is allowed.
for _, pf := range response.CommonPrefixes {
// Get just the last piece of the prefix to check it as a
// dimension. If we have '/foo/bar/baz', we just want 'baz'.
stripped := pf[len(prefix) : len(pf)-1]
allowed := field.IsAllowed(stripped)
marker = pf
if allowed {
FilterS3(bucket, pf, level+1, schema, kc)
}
}
}
}
}
if level == 0 {
// We traverse the tree in depth-first order, so once we've reached the
// end at the root (level 0), we know we're done.
// Note that things could be made faster by parallelizing the recursive
// listing, but we would need some other mechanism to know when to close
// the channel?
close(kc)
}
return
}
// Encapsulates a single record within an S3 file, allowing detection of errors
// along the way.
type S3Record struct {
Key string
Offset uint64
BytesRead int
Record []byte
Err error
}
// List the contents of the given bucket, sending matching filenames to a
// channel which can be read by the caller.
func S3FileIterator(bucket *s3.Bucket, s3Key string, offset uint64) <-chan S3Record {
recordChannel := make(chan S3Record, fileBatchSize)
go ReadS3File(bucket, s3Key, offset, recordChannel)
return recordChannel
}
func makeS3Record(s3Key string, offset uint64, bytesRead int, data []byte, err error) (result S3Record) {
r := S3Record{}
r.BytesRead = bytesRead
r.Err = err
r.Key = s3Key
r.Offset = offset
r.Record = make([]byte, len(data))
copy(r.Record, data)
return r
}
// TODO: duplicated from heka-cat
func makeSplitterRunner() (SplitterRunner, error) {
splitter := &HekaFramingSplitter{}
config := splitter.ConfigStruct()
err := splitter.Init(config)
if err != nil {
return nil, fmt.Errorf("Error initializing HekaFramingSplitter: %s", err)
}
srConfig := CommonSplitterConfig{}
sRunner := NewSplitterRunner("HekaFramingSplitter", splitter, srConfig)
return sRunner, nil
}
// Callers must call Close() on rc.
func getS3Reader(bucket *s3.Bucket, s3Key string, offset uint64) (rc io.ReadCloser, err error) {
if offset == 0 {
rc, err = bucket.GetReader(s3Key)
return
}
headers := map[string][]string{
"Range": []string{fmt.Sprintf("bytes=%d-", offset)},
}
resp, err := bucket.GetResponseWithHeaders(s3Key, headers)
if resp != nil {
rc = resp.Body
}
return
}
func ReadS3File(bucket *s3.Bucket, s3Key string, s3Offset uint64, recordChan chan S3Record) {
defer close(recordChan)
sRunner, err := makeSplitterRunner()
if err != nil {
recordChan <- S3Record{s3Key, 0, 0, []byte{}, err}
return
}
reader, err := getS3Reader(bucket, s3Key, s3Offset)
if reader != nil {
defer reader.Close()
}
if err != nil {
recordChan <- S3Record{s3Key, 0, 0, []byte{}, err}
return
}
size := s3Offset
offset := s3Offset
done := false
for !done {
n, record, err := sRunner.GetRecordFromStream(reader)
offset = size
size += uint64(n)
if err != nil {
if err == io.EOF {
lenRemaining := len(sRunner.GetRemainingData())
if lenRemaining > 0 {
// There was a partial message at the end of the stream.
// Discard the leftover bytes.
fmt.Printf("At EOF, len(remaining data) was %d\n", lenRemaining)
}
done = true
} else if err == io.ErrShortBuffer {
recordChan <- makeS3Record(s3Key, offset, n, record, fmt.Errorf("record exceeded MAX_RECORD_SIZE %d", message.MAX_RECORD_SIZE))
continue
} else {
// Some other kind of error occurred.
// Retry behaviour should be handled externally, we can restart
// from the last-good location using the s3Offset parameter.
recordChan <- makeS3Record(s3Key, offset, n, record, err)
done = true
continue
}
}
if len(record) == 0 && !done {
// This may happen if we did not read enough data to make a full
// record.
continue
}
recordChan <- makeS3Record(s3Key, offset, n, record, err)
}
return
}
func CleanBucketPrefix(prefix string) (cleaned string) {
cleaned = strings.Trim(prefix, "/")
if cleaned != "" {
cleaned += "/"
}
return
}