Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make WalkDir return errors #19677

Merged
merged 9 commits into from
May 6, 2024
37 changes: 22 additions & 15 deletions cmd/batch-expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,17 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
stopFn(toDelCopy[i], err)
batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID, err, attempts))
failed++
if attempts == retryAttempts { // all retry attempts failed, record failure
if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, false)
}
} else {
if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, false, attempts)
}
if attempts != retryAttempts {
// retry
toDel = append(toDel, toDelCopy[i])
}
} else {
stopFn(toDelCopy[i], nil)
if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, true)
ri.trackCurrentBucketObject(r.Bucket, *oi, true, attempts)
}
}
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
ctx, cancel := context.WithCancel(ctx)
defer cancel()

results := make(chan ObjectInfo, workerSize)
results := make(chan itemOrErr[ObjectInfo], workerSize)
if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{
Marker: lastObject,
LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions
Expand Down Expand Up @@ -584,11 +584,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
versionsCount int
toDel []expireObjInfo
)
failed := true
for result := range results {
if result.Err != nil {
failed = true
batchLogIf(ctx, result.Err)
continue
}

// Apply filter to find the matching rule to apply expiry
// actions accordingly.
// nolint:gocritic
if result.IsLatest {
if result.Item.IsLatest {
// send down filtered entries to be deleted using
// DeleteObjects method
if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously.
Expand All @@ -609,7 +616,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
var match BatchJobExpireFilter
var found bool
for _, rule := range r.Rules {
if rule.Matches(result, now) {
if rule.Matches(result.Item, now) {
match = rule
found = true
break
Expand All @@ -619,18 +626,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
continue
}

prevObj = result
prevObj = result.Item
matchedFilter = match
versionsCount = 1
// Include the latest version
if matchedFilter.Purge.RetainVersions == 0 {
toDel = append(toDel, expireObjInfo{
ObjectInfo: result,
ObjectInfo: result.Item,
ExpireAll: true,
})
continue
}
} else if prevObj.Name == result.Name {
} else if prevObj.Name == result.Item.Name {
if matchedFilter.Purge.RetainVersions == 0 {
continue // including latest version in toDel suffices, skipping other versions
}
Expand All @@ -643,7 +650,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
continue // retain versions
}
toDel = append(toDel, expireObjInfo{
ObjectInfo: result,
ObjectInfo: result.Item,
})
}
// Send any remaining objects downstream
Expand All @@ -658,8 +665,8 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
<-expireDoneCh // waits for the expire goroutine to complete
wk.Wait() // waits for all expire workers to retire

ri.Complete = ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0
ri.Complete = !failed && ri.ObjectsFailed == 0
ri.Failed = failed || ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri)

// Close the saverQuitCh - this also triggers saving in-memory state
Expand Down
64 changes: 46 additions & 18 deletions cmd/batch-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
} else {
stopFn(oi, nil)
}
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success)
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
Expand Down Expand Up @@ -690,6 +690,7 @@ type batchJobInfo struct {
StartTime time.Time `json:"startTime" msg:"st"`
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
RetryAttempts int `json:"retryAttempts" msg:"ra"`
Attempts int `json:"attempts" msg:"at"`

Complete bool `json:"complete" msg:"cmp"`
Failed bool `json:"failed" msg:"fld"`
Expand Down Expand Up @@ -833,21 +834,35 @@ func (ri *batchJobInfo) clone() *batchJobInfo {
ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
Attempts: ri.Attempts,
}
}

func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool, attempt int) {
if ri == nil {
return
}
ri.Attempts++
klauspost marked this conversation as resolved.
Show resolved Hide resolved
if success {
if dmarker {
ri.DeleteMarkers++
} else {
ri.Objects++
ri.BytesTransferred += size
}
if attempt > 1 {
if dmarker {
ri.DeleteMarkersFailed--
} else {
ri.ObjectsFailed--
ri.BytesFailed += size
}
}
} else {
if attempt > 1 {
// Only count first attempt
return
}
if dmarker {
ri.DeleteMarkersFailed++
} else {
Expand Down Expand Up @@ -921,7 +936,7 @@ func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectIn
}
}

func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) {
func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool, attempt int) {
if ri == nil {
return
}
Expand All @@ -931,7 +946,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,

ri.Bucket = bucket
ri.Object = info.Name
ri.countItem(info.Size, info.DeleteMarker, success)
ri.countItem(info.Size, info.DeleteMarker, success, attempt)
}

func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) {
Expand All @@ -945,7 +960,7 @@ func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInf
ri.Bucket = bucket
for i := range batch {
ri.Object = batch[i].Name
ri.countItem(batch[i].Size, batch[i].DeleteMarker, true)
ri.countItem(batch[i].Size, batch[i].DeleteMarker, true, 1)
}
}

Expand Down Expand Up @@ -1057,8 +1072,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)

var (
walkCh = make(chan ObjectInfo, 100)
slowCh = make(chan ObjectInfo, 100)
walkCh = make(chan itemOrErr[ObjectInfo], 100)
slowCh = make(chan itemOrErr[ObjectInfo], 100)
)

if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
Expand All @@ -1084,7 +1099,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
batchLogIf(ctx, err)
for _, b := range batch {
slowCh <- b
slowCh <- itemOrErr[ObjectInfo]{Item: b}
}
} else {
ri.trackCurrentBucketBatch(r.Source.Bucket, batch)
Expand All @@ -1095,12 +1110,12 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
}
}
for obj := range walkCh {
if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) {
if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) {
slowCh <- obj
continue
}

batch = append(batch, obj)
batch = append(batch, obj.Item)

if len(batch) < *r.Source.Snowball.Batch {
continue
Expand Down Expand Up @@ -1153,8 +1168,13 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
prevObj := ""

skipReplicate := false
for result := range slowCh {
result := result
for res := range slowCh {
if res.Err != nil {
ri.Failed = true
batchLogIf(ctx, res.Err)
continue
}
result := res.Item
if result.Name != prevObj {
prevObj = result.Name
skipReplicate = result.DeleteMarker && s3Type
Expand Down Expand Up @@ -1183,7 +1203,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} else {
stopFn(result, nil)
}
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
Expand Down Expand Up @@ -1484,7 +1504,7 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
jobType = string(madmin.BatchJobReplicate)
}

resultCh := make(chan ObjectInfo)
resultCh := make(chan itemOrErr[ObjectInfo])

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -1496,8 +1516,12 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)

listResult := madmin.ListBatchJobsResult{}
for result := range resultCh {
if result.Err != nil {
batchLogIf(ctx, result.Err)
klauspost marked this conversation as resolved.
Show resolved Hide resolved
return
}
req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, result.Name); err != nil {
if err := req.load(ctx, objectAPI, result.Item.Name); err != nil {
if !errors.Is(err, errNoSuchJob) {
batchLogIf(ctx, err)
}
Expand Down Expand Up @@ -1702,20 +1726,24 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP
}

func (j *BatchJobPool) resume() {
results := make(chan ObjectInfo, 100)
results := make(chan itemOrErr[ObjectInfo], 100)
ctx, cancel := context.WithCancel(j.ctx)
defer cancel()
if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil {
batchLogIf(j.ctx, err)
return
}
for result := range results {
if result.Err != nil {
batchLogIf(j.ctx, result.Err)
continue
}
// ignore batch-replicate.bin and batch-rotate.bin entries
if strings.HasSuffix(result.Name, slashSeparator) {
if strings.HasSuffix(result.Item.Name, slashSeparator) {
continue
}
req := &BatchJobRequest{}
if err := req.load(ctx, j.objLayer, result.Name); err != nil {
if err := req.load(ctx, j.objLayer, result.Item.Name); err != nil {
batchLogIf(ctx, err)
continue
}
Expand Down
35 changes: 30 additions & 5 deletions cmd/batch-handlers_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading