Skip to content

Commit

Permalink
Recreate bucket metacache if corrupted
Browse files Browse the repository at this point in the history
If bucket metadata cannot be read, clean up existing and create a new.
  • Loading branch information
klauspost committed Oct 31, 2020
1 parent 6135f07 commit 470bd66
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
37 changes: 32 additions & 5 deletions cmd/metacache-bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ type bucketMetacache struct {
}

// newBucketMetacache creates a new bucketMetacache.
func newBucketMetacache(bucket string) *bucketMetacache {
// Optionally remove all existing caches.
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
if cleanup {
// Recursively delete all caches.
objAPI := newObjectLayerFn()
ez, ok := objAPI.(*erasureServerSets)
if ok {
ctx := context.Background()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
return &bucketMetacache{
bucket: bucket,
caches: make(map[string]metacache, 10),
Expand Down Expand Up @@ -97,19 +107,26 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
} else {
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket), err
if errors.Is(err, InsufficientReadQuorum{}) {
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
}
return newBucketMetacache(bucket, false), err
}
wg.Wait()
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
return &meta, nil
}
Expand Down Expand Up @@ -393,16 +410,26 @@ func (b *bucketMetacache) deleteAll() {
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
return
}

b.updated = true
if !b.transient {
// Delete all.
logger.LogIf(ctx, ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator)))
b.caches = make(map[string]metacache, 10)
return
}

// Transient are in different buckets.
var wg sync.WaitGroup
for id := range b.caches {
wg.Add(1)
go func(cache metacache) {
defer wg.Done()
logger.LogIf(ctx, ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id)))
}(b.caches[id])
delete(b.caches, id)
}
wg.Wait()
b.caches = make(map[string]metacache, 10)
}

// deleteCache will delete a specific cache and all files related to it across the cluster.
Expand Down
2 changes: 1 addition & 1 deletion cmd/metacache-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const metacacheManagerTransientBucket = "**transient**"
// initManager will start async saving the cache.
func (m *metacacheManager) initManager() {
// Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket)
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb

Expand Down

0 comments on commit 470bd66

Please sign in to comment.