-
Notifications
You must be signed in to change notification settings - Fork 351
/
inventory.go
165 lines (151 loc) · 5.3 KB
/
inventory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package s3
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"sort"
"strings"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/cloud/aws/s3inventory"
"github.com/treeverse/lakefs/pkg/logging"
)
var ErrInventoryFilesRangesOverlap = errors.New("got s3 inventory with files covering overlapping ranges")
type Manifest struct {
URL string `json:"-"`
InventoryBucketArn string `json:"destinationBucket"`
SourceBucket string `json:"sourceBucket"`
Files []InventoryFile `json:"files"` // inventory list files, each contains a list of objects
Format string `json:"fileFormat"`
CreationTimestamp string `json:"creationTimestamp"`
inventoryBucket string
}
type InventoryFile struct {
Key string `json:"key"` // an s3 key for an inventory list file
firstKey string
lastKey string
}
func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool, prefixes []string) (block.Inventory, error) {
m, err := a.loadManifest(ctx, manifestURL)
if err != nil {
return nil, err
}
svc := a.clients.Get(ctx, m.inventoryBucket)
return GenerateInventory(logger, m, s3inventory.NewReader(ctx, svc, logger), shouldSort, prefixes)
}
func GenerateInventory(logger logging.Logger, m *Manifest, inventoryReader s3inventory.IReader, shouldSort bool, prefixes []string) (block.Inventory, error) {
if logger == nil {
logger = logging.Default()
}
if shouldSort || len(prefixes) > 0 {
if err := sortManifest(m, logger, inventoryReader); err != nil {
return nil, err
}
}
if len(prefixes) > 0 {
manifestFileCount := len(m.Files)
m.Files = filterFiles(m.Files, prefixes)
logger.Debugf("manifest filtered from %d to %d files", manifestFileCount, len(m.Files))
}
return &Inventory{Manifest: m, logger: logger, shouldSort: shouldSort, reader: inventoryReader, prefixes: prefixes}, nil
}
type Inventory struct {
Manifest *Manifest
logger logging.Logger
shouldSort bool
reader s3inventory.IReader
prefixes []string
}
func (inv *Inventory) Iterator() block.InventoryIterator {
return NewInventoryIterator(inv)
}
func (inv *Inventory) SourceName() string {
return inv.Manifest.SourceBucket
}
func (inv *Inventory) InventoryURL() string {
return inv.Manifest.URL
}
func (a *Adapter) loadManifest(ctx context.Context, manifestURL string) (*Manifest, error) {
u, err := url.Parse(manifestURL)
if err != nil {
return nil, err
}
output, err := a.clients.Get(ctx, u.Host).GetObject(&s3.GetObjectInput{Bucket: &u.Host, Key: &u.Path})
if err != nil {
return nil, fmt.Errorf("%w: failed to read manifest.json from %s", err, manifestURL)
}
var m Manifest
err = json.NewDecoder(output.Body).Decode(&m)
if err != nil {
return nil, err
}
if m.Format != s3inventory.OrcFormatName && m.Format != s3inventory.ParquetFormatName {
return nil, fmt.Errorf("%w. got format: %s", s3inventory.ErrUnsupportedInventoryFormat, m.Format)
}
m.URL = manifestURL
inventoryBucketArn, err := arn.Parse(m.InventoryBucketArn)
if err != nil {
return nil, fmt.Errorf("failed to parse inventory bucket arn: %w", err)
}
m.inventoryBucket = inventoryBucketArn.Resource
return &m, nil
}
func filterFiles(files []InventoryFile, prefixes []string) []InventoryFile {
sort.Strings(prefixes)
currentPrefixIdx := 0
filteredFiles := make([]InventoryFile, 0)
for i := 0; i < len(files); i++ {
for {
// find a prefix that may have suitable keys in the current file
if prefixes[currentPrefixIdx] >= files[i].firstKey {
// prefix may be in scope of current file
break
}
if strings.HasPrefix(files[i].firstKey, prefixes[currentPrefixIdx]) {
// first object in file starts with prefix
break
}
// current prefix ends before this file - move to next prefix
currentPrefixIdx++
if currentPrefixIdx == len(prefixes) {
// no more prefixes - other files are irrelevant
return filteredFiles
}
}
if strings.HasPrefix(files[i].firstKey, prefixes[currentPrefixIdx]) ||
(prefixes[currentPrefixIdx] >= files[i].firstKey && prefixes[currentPrefixIdx] < files[i].lastKey) {
// file may contain keys starting with this prefix
filteredFiles = append(filteredFiles, files[i])
}
}
return filteredFiles
}
func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader) error {
for i, f := range m.Files {
mr, err := reader.GetMetadataReader(m.Format, m.inventoryBucket, f.Key)
if err != nil {
return fmt.Errorf("failed to sort inventory files in manifest: %w", err)
}
m.Files[i].firstKey = mr.FirstObjectKey()
m.Files[i].lastKey = mr.LastObjectKey()
err = mr.Close()
if err != nil {
logger.WithError(err).WithField("file", f).Error("Failed to close inventory file")
}
}
sort.Slice(m.Files, func(i, j int) bool {
return m.Files[i].firstKey < m.Files[j].firstKey ||
(m.Files[i].firstKey == m.Files[j].firstKey && m.Files[i].lastKey < m.Files[j].lastKey)
})
// validate sorting: if a file begins before the next one ends - the files cover overlapping ranges,
// which we don't know how to handle.
for i := 0; i < len(m.Files)-1; i++ {
if m.Files[i+1].firstKey < m.Files[i].lastKey {
return ErrInventoryFilesRangesOverlap
}
}
return nil
}