Skip to content

Commit

Permalink
Recreate bucket metacache if corrupted (#10800)
Browse files Browse the repository at this point in the history
If bucket metadata cannot be read, clean up existing and create a new.
  • Loading branch information
klauspost committed Oct 31, 2020
1 parent 422898d commit fe9f23e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
37 changes: 32 additions & 5 deletions cmd/metacache-bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ type bucketMetacache struct {
}

// newBucketMetacache creates a new bucketMetacache.
func newBucketMetacache(bucket string) *bucketMetacache {
// Optionally remove all existing caches.
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
if cleanup {
// Recursively delete all caches.
objAPI := newObjectLayerFn()
ez, ok := objAPI.(*erasureServerSets)
if ok {
ctx := context.Background()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
return &bucketMetacache{
bucket: bucket,
caches: make(map[string]metacache, 10),
Expand Down Expand Up @@ -97,19 +107,26 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
} else {
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket), err
if errors.Is(err, InsufficientReadQuorum{}) {
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
}
return newBucketMetacache(bucket, false), err
}
wg.Wait()
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
return &meta, nil
}
Expand Down Expand Up @@ -393,16 +410,26 @@ func (b *bucketMetacache) deleteAll() {
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
return
}

b.updated = true
if !b.transient {
// Delete all.
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
b.caches = make(map[string]metacache, 10)
return
}

// Transient are in different buckets.
var wg sync.WaitGroup
for id := range b.caches {
wg.Add(1)
go func(cache metacache) {
defer wg.Done()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
}(b.caches[id])
delete(b.caches, id)
}
wg.Wait()
b.caches = make(map[string]metacache, 10)
}

// deleteCache will delete a specific cache and all files related to it across the cluster.
Expand Down
2 changes: 1 addition & 1 deletion cmd/metacache-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const metacacheManagerTransientBucket = "**transient**"
// initManager will start async saving the cache.
func (m *metacacheManager) initManager() {
// Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket)
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb

Expand Down

0 comments on commit fe9f23e

Please sign in to comment.