Skip to content

Commit

Permalink
fix a crash from unstable sort for > 2 pools (#12501)
Browse files Browse the repository at this point in the history
Fix in #12487 assumes that slices with 
tiebreaks are sorted equally. That is only the case for "stable"  sort versions.
  • Loading branch information
klauspost committed Jun 14, 2021
1 parent 3197190 commit b89c0be
Showing 1 changed file with 67 additions and 61 deletions.
128 changes: 67 additions & 61 deletions cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,19 +617,22 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
unlockOnDefer = true
}

errs := make([]error, len(z.serverPools))
grs := make([]*GetObjectReader, len(z.serverPools))

lockType = noLock // do not take locks at lower levels
checkPrecondFn := opts.CheckPrecondFn
opts.CheckPrecondFn = nil
results := make([]struct {
zIdx int
gr *GetObjectReader
err error
}, len(z.serverPools))

var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
grs[i], errs[i] = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
results[i].zIdx = i
results[i].gr, results[i].err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}(i, pool)
}
wg.Wait()
Expand All @@ -638,59 +641,62 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
var (
mtime1 time.Time
mtime2 time.Time
)
if grs[i] != nil {
mtime1 = grs[i].ObjInfo.ModTime
sort.Slice(results, func(i, j int) bool {
var mtimeI, mtimeJ time.Time

if results[i].gr != nil {
mtimeI = results[i].gr.ObjInfo.ModTime
}
if grs[j] != nil {
mtime2 = grs[j].ObjInfo.ModTime
if results[j].gr != nil {
mtimeJ = results[j].gr.ObjInfo.ModTime
}
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(grs, less)
// On tiebreaks, choose the earliest.
if mtimeI.Equal(mtimeJ) {
return results[i].zIdx < results[j].zIdx
}
return mtimeI.After(mtimeJ)
})

var found = -1
for i, err := range errs {
if err == nil {
for i, res := range results {
if res.err == nil {
// Select earliest result
found = i
break
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
for _, grr := range grs {
if grr != nil {
grr.Close()
}
if !isErrObjectNotFound(res.err) && !isErrVersionNotFound(res.err) {
for _, res := range results {
res.gr.Close()
}
return gr, err
return nil, res.err
}
}

if found >= 0 {
if checkPrecondFn != nil && checkPrecondFn(grs[found].ObjInfo) {
for _, grr := range grs {
grr.Close()
}
return nil, PreConditionFailed{}
if found < 0 {
object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
for i, grr := range grs {
if i == found {
continue
}
grr.Close()
return gr, ObjectNotFound{Bucket: bucket, Object: object}
}

// Check preconditions.
if checkPrecondFn != nil && checkPrecondFn(results[found].gr.ObjInfo) {
for _, res := range results {
res.gr.Close()
}
return grs[found], nil
return nil, PreConditionFailed{}
}

object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
// Close all others
for i, res := range results {
if i == found {
continue
}
res.gr.Close()
}
return gr, ObjectNotFound{Bucket: bucket, Object: object}

return results[found].gr, nil
}

func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
Expand All @@ -713,15 +719,19 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)

errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
results := make([]struct {
zIdx int
oi ObjectInfo
err error
}, len(z.serverPools))
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, opts)
results[i].zIdx = i
results[i].oi, results[i].err = pool.GetObjectInfo(ctx, bucket, object, opts)
}(i, pool)
}
wg.Wait()
Expand All @@ -730,31 +740,27 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
mtime1 := objInfos[i].ModTime
mtime2 := objInfos[j].ModTime
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(objInfos, less)

var found = -1
for i, err := range errs {
sort.Slice(results, func(i, j int) bool {
a, b := results[i], results[j]
if a.oi.ModTime.Equal(b.oi.ModTime) {
// On tiebreak, select the lowest zone index.
return a.zIdx < b.zIdx
}
return a.oi.ModTime.After(b.oi.ModTime)
})
for _, res := range results {
// Return first found.
err := res.err
if err == nil {
found = i
break
return res.oi, nil
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// some errors such as MethodNotAllowed for delete marker
// should be returned upwards.
return objInfos[i], err
return res.oi, err
}
}

if found >= 0 {
return objInfos[found], nil
}

object = decodeDirObject(object)
if opts.VersionID != "" {
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
Expand Down

0 comments on commit b89c0be

Please sign in to comment.