Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recreate bucket metacache if corrupted #10800

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 32 additions & 5 deletions cmd/metacache-bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ type bucketMetacache struct {
}

// newBucketMetacache creates a new bucketMetacache.
func newBucketMetacache(bucket string) *bucketMetacache {
// Optionally remove all existing caches.
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
if cleanup {
// Recursively delete all caches.
objAPI := newObjectLayerFn()
ez, ok := objAPI.(*erasureServerSets)
if ok {
ctx := context.Background()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
return &bucketMetacache{
bucket: bucket,
caches: make(map[string]metacache, 10),
Expand Down Expand Up @@ -97,19 +107,26 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
} else {
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket), err
if errors.Is(err, InsufficientReadQuorum{}) {
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
}
return newBucketMetacache(bucket, false), err
}
wg.Wait()
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
return &meta, nil
}
Expand Down Expand Up @@ -393,16 +410,26 @@ func (b *bucketMetacache) deleteAll() {
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
return
}

b.updated = true
if !b.transient {
// Delete all.
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
b.caches = make(map[string]metacache, 10)
return
}

// Transient are in different buckets.
var wg sync.WaitGroup
for id := range b.caches {
wg.Add(1)
go func(cache metacache) {
defer wg.Done()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
}(b.caches[id])
delete(b.caches, id)
}
wg.Wait()
b.caches = make(map[string]metacache, 10)
}

// deleteCache will delete a specific cache and all files related to it across the cluster.
Expand Down
2 changes: 1 addition & 1 deletion cmd/metacache-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const metacacheManagerTransientBucket = "**transient**"
// initManager will start async saving the cache.
func (m *metacacheManager) initManager() {
// Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket)
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb

Expand Down