/
upload.go
211 lines (175 loc) · 5.09 KB
/
upload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter
import (
"archive/zip"
"context"
"strconv"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/internalpb"
"storj.io/uplink"
)
// LATEST is the name of the file that contains the most recently completed bloomfilter generation prefix.
const LATEST = "LATEST"
// Upload is used to upload bloom filters to specified bucket.
type Upload struct {
log *zap.Logger
config Config
}
// NewUpload creates new upload for bloom filters.
func NewUpload(log *zap.Logger, config Config) *Upload {
return &Upload{
log: log,
config: config,
}
}
// CheckConfig check configuration values.
func (bfu *Upload) CheckConfig() error {
switch {
case bfu.config.AccessGrant == "":
return errs.New("Access Grant is not set")
case bfu.config.Bucket == "":
return errs.New("Bucket is not set")
}
return nil
}
// UploadBloomFilters stores a zipfile with multiple bloom filters in a bucket.
func (bfu *Upload) UploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(retainInfos) == 0 {
return nil
}
prefix := time.Now().Format(time.RFC3339Nano)
expirationTime := time.Now().Add(bfu.config.ExpireIn)
accessGrant, err := uplink.ParseAccess(bfu.config.AccessGrant)
if err != nil {
return err
}
project, err := uplink.OpenProject(ctx, accessGrant)
if err != nil {
return err
}
defer func() {
// do cleanup in case of any error while uploading bloom filters
if err != nil {
// TODO should we drop whole bucket if cleanup will fail
err = errs.Combine(err, bfu.cleanup(ctx, project, prefix))
}
err = errs.Combine(err, project.Close())
}()
_, err = project.EnsureBucket(ctx, bfu.config.Bucket)
if err != nil {
return err
}
// TODO move it before segment loop is started
o := uplink.ListObjectsOptions{
Prefix: prefix + "/",
}
iterator := project.ListObjects(ctx, bfu.config.Bucket, &o)
for iterator.Next() {
if iterator.Item().IsPrefix {
continue
}
bfu.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", bfu.config.Bucket))
return nil
}
infos := make([]internalpb.RetainInfo, 0, bfu.config.ZipBatchSize)
batchNumber := 0
for nodeID, info := range retainInfos {
infos = append(infos, internalpb.RetainInfo{
Filter: info.Filter.Bytes(),
// because bloom filters should be created from immutable database
// snapshot we are using latest segment creation date
CreationDate: latestCreationDate,
PieceCount: int64(info.Count),
StorageNodeId: nodeID,
})
if len(infos) == bfu.config.ZipBatchSize {
err = bfu.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos)
if err != nil {
return err
}
infos = infos[:0]
batchNumber++
}
}
// upload rest of infos if any
if err := bfu.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil {
return err
}
// update LATEST file
upload, err := project.UploadObject(ctx, bfu.config.Bucket, LATEST, nil)
if err != nil {
return err
}
_, err = upload.Write([]byte(prefix))
if err != nil {
return err
}
return upload.Commit()
}
// uploadPack uploads single zip pack with multiple bloom filters.
func (bfu *Upload) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(infos) == 0 {
return nil
}
upload, err := project.UploadObject(ctx, bfu.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
Expires: expirationTime,
})
if err != nil {
return err
}
zipWriter := zip.NewWriter(upload)
defer func() {
err = errs.Combine(err, zipWriter.Close())
if err != nil {
err = errs.Combine(err, upload.Abort())
} else {
err = upload.Commit()
}
}()
for _, info := range infos {
retainInfoBytes, err := pb.Marshal(&info)
if err != nil {
return err
}
writer, err := zipWriter.Create(info.StorageNodeId.String())
if err != nil {
return err
}
write, err := writer.Write(retainInfoBytes)
if err != nil {
return err
}
if len(retainInfoBytes) != write {
return errs.New("not whole bloom filter was written")
}
}
return nil
}
// cleanup moves all objects from root location to unique prefix. Objects will be deleted
// automatically when expires.
func (bfu *Upload) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) {
defer mon.Task()(&ctx)(&err)
errPrefix := "upload-error-" + time.Now().Format(time.RFC3339)
o := uplink.ListObjectsOptions{
Prefix: prefix + "/",
}
iterator := project.ListObjects(ctx, bfu.config.Bucket, &o)
for iterator.Next() {
item := iterator.Item()
if item.IsPrefix {
continue
}
err := project.MoveObject(ctx, bfu.config.Bucket, item.Key, bfu.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil)
if err != nil {
return err
}
}
return iterator.Err()
}