Skip to content

Commit

Permalink
Make WalkDir return errors (#19677)
Browse files Browse the repository at this point in the history
If used, 'opts.Marker` will cause many missed entries since results are returned 
unsorted, and pools are serialized.

Switch to fully concurrent listing and merging across pools to return sorted entries.
  • Loading branch information
klauspost committed May 6, 2024
1 parent 9a9a49a commit 847ee5a
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 75 deletions.
37 changes: 22 additions & 15 deletions cmd/batch-expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,17 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
stopFn(toDelCopy[i], err)
batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID, err, attempts))
failed++
if attempts == retryAttempts { // all retry attempts failed, record failure
if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, false)
}
} else {
if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, false, attempts)
}
if attempts != retryAttempts {
// retry
toDel = append(toDel, toDelCopy[i])
}
} else {
stopFn(toDelCopy[i], nil)
if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, true)
ri.trackCurrentBucketObject(r.Bucket, *oi, true, attempts)
}
}
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
ctx, cancel := context.WithCancel(ctx)
defer cancel()

results := make(chan ObjectInfo, workerSize)
results := make(chan itemOrErr[ObjectInfo], workerSize)
if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{
Marker: lastObject,
LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions
Expand Down Expand Up @@ -584,11 +584,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
versionsCount int
toDel []expireObjInfo
)
failed := true
for result := range results {
if result.Err != nil {
failed = true
batchLogIf(ctx, result.Err)
continue
}

// Apply filter to find the matching rule to apply expiry
// actions accordingly.
// nolint:gocritic
if result.IsLatest {
if result.Item.IsLatest {
// send down filtered entries to be deleted using
// DeleteObjects method
if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously.
Expand All @@ -609,7 +616,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
var match BatchJobExpireFilter
var found bool
for _, rule := range r.Rules {
if rule.Matches(result, now) {
if rule.Matches(result.Item, now) {
match = rule
found = true
break
Expand All @@ -619,18 +626,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
continue
}

prevObj = result
prevObj = result.Item
matchedFilter = match
versionsCount = 1
// Include the latest version
if matchedFilter.Purge.RetainVersions == 0 {
toDel = append(toDel, expireObjInfo{
ObjectInfo: result,
ObjectInfo: result.Item,
ExpireAll: true,
})
continue
}
} else if prevObj.Name == result.Name {
} else if prevObj.Name == result.Item.Name {
if matchedFilter.Purge.RetainVersions == 0 {
continue // including latest version in toDel suffices, skipping other versions
}
Expand All @@ -643,7 +650,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
continue // retain versions
}
toDel = append(toDel, expireObjInfo{
ObjectInfo: result,
ObjectInfo: result.Item,
})
}
// Send any remaining objects downstream
Expand All @@ -658,8 +665,8 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
<-expireDoneCh // waits for the expire goroutine to complete
wk.Wait() // waits for all expire workers to retire

ri.Complete = ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0
ri.Complete = !failed && ri.ObjectsFailed == 0
ri.Failed = failed || ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri)

// Close the saverQuitCh - this also triggers saving in-memory state
Expand Down
64 changes: 46 additions & 18 deletions cmd/batch-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
} else {
stopFn(oi, nil)
}
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success)
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
Expand Down Expand Up @@ -690,6 +690,7 @@ type batchJobInfo struct {
StartTime time.Time `json:"startTime" msg:"st"`
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
RetryAttempts int `json:"retryAttempts" msg:"ra"`
Attempts int `json:"attempts" msg:"at"`

Complete bool `json:"complete" msg:"cmp"`
Failed bool `json:"failed" msg:"fld"`
Expand Down Expand Up @@ -833,21 +834,35 @@ func (ri *batchJobInfo) clone() *batchJobInfo {
ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
Attempts: ri.Attempts,
}
}

func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool, attempt int) {
if ri == nil {
return
}
ri.Attempts++
if success {
if dmarker {
ri.DeleteMarkers++
} else {
ri.Objects++
ri.BytesTransferred += size
}
if attempt > 1 {
if dmarker {
ri.DeleteMarkersFailed--
} else {
ri.ObjectsFailed--
ri.BytesFailed += size
}
}
} else {
if attempt > 1 {
// Only count first attempt
return
}
if dmarker {
ri.DeleteMarkersFailed++
} else {
Expand Down Expand Up @@ -921,7 +936,7 @@ func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectIn
}
}

func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) {
func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool, attempt int) {
if ri == nil {
return
}
Expand All @@ -931,7 +946,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,

ri.Bucket = bucket
ri.Object = info.Name
ri.countItem(info.Size, info.DeleteMarker, success)
ri.countItem(info.Size, info.DeleteMarker, success, attempt)
}

func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) {
Expand All @@ -945,7 +960,7 @@ func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInf
ri.Bucket = bucket
for i := range batch {
ri.Object = batch[i].Name
ri.countItem(batch[i].Size, batch[i].DeleteMarker, true)
ri.countItem(batch[i].Size, batch[i].DeleteMarker, true, 1)
}
}

Expand Down Expand Up @@ -1057,8 +1072,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)

var (
walkCh = make(chan ObjectInfo, 100)
slowCh = make(chan ObjectInfo, 100)
walkCh = make(chan itemOrErr[ObjectInfo], 100)
slowCh = make(chan itemOrErr[ObjectInfo], 100)
)

if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
Expand All @@ -1084,7 +1099,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
batchLogIf(ctx, err)
for _, b := range batch {
slowCh <- b
slowCh <- itemOrErr[ObjectInfo]{Item: b}
}
} else {
ri.trackCurrentBucketBatch(r.Source.Bucket, batch)
Expand All @@ -1095,12 +1110,12 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
}
}
for obj := range walkCh {
if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) {
if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) {
slowCh <- obj
continue
}

batch = append(batch, obj)
batch = append(batch, obj.Item)

if len(batch) < *r.Source.Snowball.Batch {
continue
Expand Down Expand Up @@ -1153,8 +1168,13 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
prevObj := ""

skipReplicate := false
for result := range slowCh {
result := result
for res := range slowCh {
if res.Err != nil {
ri.Failed = true
batchLogIf(ctx, res.Err)
continue
}
result := res.Item
if result.Name != prevObj {
prevObj = result.Name
skipReplicate = result.DeleteMarker && s3Type
Expand Down Expand Up @@ -1183,7 +1203,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} else {
stopFn(result, nil)
}
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
Expand Down Expand Up @@ -1484,7 +1504,7 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
jobType = string(madmin.BatchJobReplicate)
}

resultCh := make(chan ObjectInfo)
resultCh := make(chan itemOrErr[ObjectInfo])

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -1496,8 +1516,12 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)

listResult := madmin.ListBatchJobsResult{}
for result := range resultCh {
if result.Err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, result.Err), r.URL)
return
}
req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, result.Name); err != nil {
if err := req.load(ctx, objectAPI, result.Item.Name); err != nil {
if !errors.Is(err, errNoSuchJob) {
batchLogIf(ctx, err)
}
Expand Down Expand Up @@ -1702,20 +1726,24 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP
}

func (j *BatchJobPool) resume() {
results := make(chan ObjectInfo, 100)
results := make(chan itemOrErr[ObjectInfo], 100)
ctx, cancel := context.WithCancel(j.ctx)
defer cancel()
if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil {
batchLogIf(j.ctx, err)
return
}
for result := range results {
if result.Err != nil {
batchLogIf(j.ctx, result.Err)
continue
}
// ignore batch-replicate.bin and batch-rotate.bin entries
if strings.HasSuffix(result.Name, slashSeparator) {
if strings.HasSuffix(result.Item.Name, slashSeparator) {
continue
}
req := &BatchJobRequest{}
if err := req.load(ctx, j.objLayer, result.Name); err != nil {
if err := req.load(ctx, j.objLayer, result.Item.Name); err != nil {
batchLogIf(ctx, err)
continue
}
Expand Down
35 changes: 30 additions & 5 deletions cmd/batch-handlers_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 847ee5a

Please sign in to comment.