Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid disk monitoring leaks under various conditions #18777

Merged
merged 2 commits into from Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 23 additions & 8 deletions cmd/erasure-sets.go
Expand Up @@ -128,9 +128,11 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
if errors.Is(err, errUnformattedDisk) {
info, derr := disk.DiskInfo(context.TODO(), false)
if derr != nil && info.RootDisk {
disk.Close()
return nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)
}
}
disk.Close()
return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
}

Expand Down Expand Up @@ -413,6 +415,7 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [
globalLocalDrivesMu.RUnlock()
continue
}
disk.Close()
disk = ldisk
globalLocalDrivesMu.RUnlock()
}
Expand Down Expand Up @@ -1053,6 +1056,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
// We should not heal on root disk. i.e in a situation where the minio-administrator has unmounted a
// defective drive we should not heal a path on the root disk.
logger.LogIf(GlobalContext, fmt.Errorf("Drive `%s` is part of root drive, will not be used", storageDisks[i]))
storageDisks[i].Close()
storageDisks[i] = nil
}
}
Expand All @@ -1062,7 +1066,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{
cleanUp: true,
healthCheck: true,
healthCheck: false,
})

defer func(storageDisks []StorageAPI) {
Expand Down Expand Up @@ -1132,6 +1136,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}
if err := saveFormatErasure(storageDisks[index], format, formatOpID); err != nil {
logger.LogIf(ctx, fmt.Errorf("Drive %s failed to write updated 'format.json': %v", storageDisks[index], err))
storageDisks[index].Close()
tmpNewFormats[index] = nil // this disk failed to write new format
}
}
Expand All @@ -1154,17 +1159,27 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}

if disk := storageDisks[index]; disk != nil {
disk.SetDiskLoc(s.poolIndex, m, n)
if disk.IsLocal() {
disk.SetDiskLoc(s.poolIndex, m, n)

if disk.IsLocal() && driveQuorum {
commonWrites, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2)
xldisk, ok := disk.(*xlStorageDiskIDCheck)
if ok {
xldisk.totalWrites.Add(commonWrites)
xldisk.totalDeletes.Add(commonDeletes)
xldisk.storage.setWriteAttribute(commonWrites)
xldisk.storage.setDeleteAttribute(commonDeletes)
if driveQuorum {
commonWrites, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2)
xldisk.totalWrites.Store(commonWrites)
xldisk.totalDeletes.Store(commonDeletes)
xldisk.storage.setWriteAttribute(commonWrites)
xldisk.storage.setDeleteAttribute(commonDeletes)
}
go xldisk.monitorDiskWritable(xldisk.diskCtx)
}
} else {
disk.Close() // Close the remote storage client, re-initialize with healthchecks.
disk, err = newStorageRESTClient(disk.Endpoint(), true, globalGrid.Load())
if err != nil {
continue
}
disk.SetDiskLoc(s.poolIndex, m, n)
}

s.erasureDisks[m][n] = disk
Expand Down
31 changes: 21 additions & 10 deletions cmd/xl-storage-disk-id-check.go
Expand Up @@ -228,8 +228,8 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
}

if driveQuorum {
xl.totalWrites.Add(xl.storage.getWriteAttribute())
xl.totalDeletes.Add(xl.storage.getDeleteAttribute())
xl.totalWrites.Store(xl.storage.getWriteAttribute())
xl.totalDeletes.Store(xl.storage.getDeleteAttribute())
}

xl.diskCtx, xl.cancel = context.WithCancel(context.TODO())
Expand Down Expand Up @@ -1032,37 +1032,50 @@ func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) {
if t > maxTimeSinceLastSuccess {
if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) {
logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline, time since last response %v", globalLocalNodeName, p.storage.String(), t.Round(time.Millisecond)))
go p.monitorDiskStatus(t)
go p.monitorDiskStatus(0, mustGetUUID())
}
return errFaultyDisk
}
return nil
}

// Make sure we do not write O_DIRECT aligned I/O because WrIteAll() ends
// up using O_DIRECT codepath which internally utilizes p.health.tokens
// we need to avoid using incoming I/O tokens as part of the healthcheck
// monitoring I/O.
var toWrite = []byte{2048: 42}

// monitorDiskStatus should be called once when a drive has been marked offline.
// Once the disk has been deemed ok, it will return to online status.
func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration) {
func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()

fn := mustGetUUID()
for range t.C {
if contextCanceled(p.diskCtx) {
return
}

if len(p.health.tokens) == 0 {
// Queue is still full, no need to check.
continue
}
err := p.storage.WriteAll(context.Background(), minioMetaTmpBucket, fn, []byte{10000: 42})

err := p.storage.WriteAll(context.Background(), minioMetaTmpBucket, fn, toWrite)
if err != nil {
continue
}

b, err := p.storage.ReadAll(context.Background(), minioMetaTmpBucket, fn)
if err != nil || len(b) != 10001 {
if err != nil || len(b) != len(toWrite) {
continue
}

err = p.storage.Delete(context.Background(), minioMetaTmpBucket, fn, DeleteOptions{
Recursive: false,
Immediate: false,
})

if err == nil {
t := time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))
if spent > 0 {
Expand Down Expand Up @@ -1108,8 +1121,6 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
defer t.Stop()
fn := mustGetUUID()

// Be just above directio size.
toWrite := []byte{xioutil.DirectioAlignSize + 1: 42}
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

monitor := func() bool {
Expand All @@ -1129,7 +1140,7 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
goOffline := func(err error, spent time.Duration) {
if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) {
logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err))
go p.monitorDiskStatus(spent)
go p.monitorDiskStatus(spent, fn)
}
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/xl-storage.go
Expand Up @@ -715,11 +715,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
s.diskInfoCache.Once.Do(func() {
s.diskInfoCache.TTL = time.Second
s.diskInfoCache.Update = func() (interface{}, error) {
dcinfo := DiskInfo{
RootDisk: s.rootDisk,
MountPath: s.drivePath,
Endpoint: s.endpoint.String(),
}
dcinfo := DiskInfo{}
di, err := getDiskInfo(s.drivePath)
if err != nil {
return dcinfo, err
Expand Down Expand Up @@ -748,6 +744,10 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
if v != nil {
info = v.(DiskInfo)
}

info.RootDisk = s.rootDisk
info.MountPath = s.drivePath
info.Endpoint = s.endpoint.String()
info.Scanning = atomic.LoadInt32(&s.scanning) == 1
return info, err
}
Expand Down