Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: serve always only the latest objects #12487

Merged
merged 1 commit into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 78 additions & 11 deletions cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
return serverPools
}

// poolObjInfo represents the state of an object per pool
type poolObjInfo struct {
PoolIndex int
ObjInfo ObjectInfo
Err error
}

// getPoolIdxExisting returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
Expand All @@ -244,35 +251,49 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
return 0, nil
}

errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
poolObjInfos := make([]poolObjInfo, len(z.serverPools))

var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
// remember the pool index, we may sort the slice original index might be lost.
pinfo := poolObjInfo{
PoolIndex: i,
}
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
poolObjInfos[i] = pinfo
}(i, pool)
}
wg.Wait()

for i, err := range errs {
if err != nil && !isErrObjectNotFound(err) {
return -1, err
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
sort.Slice(poolObjInfos, func(i, j int) bool {
mtime1 := poolObjInfos[i].ObjInfo.ModTime
mtime2 := poolObjInfos[j].ObjInfo.ModTime
return mtime1.After(mtime2)
})

for _, pinfo := range poolObjInfos {
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
}
if isErrObjectNotFound(err) {
if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker,
// check objInfo to confirm.
if objInfos[i].DeleteMarker && objInfos[i].Name != "" {
return i, nil
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
return pinfo.PoolIndex, nil
}

// objInfo is not valid, truly the object doesn't
// exist proceed to next pool.
continue
}
return i, nil
return pinfo.PoolIndex, nil
}

return -1, toObjectErr(errFileNotFound, bucket, object)
Expand Down Expand Up @@ -600,6 +621,9 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
grs := make([]*GetObjectReader, len(z.serverPools))

lockType = noLock // do not take locks at lower levels
checkPrecondFn := opts.CheckPrecondFn
opts.CheckPrecondFn = nil

var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
Expand All @@ -610,6 +634,26 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
}
wg.Wait()

// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
var (
mtime1 time.Time
mtime2 time.Time
)
if grs[i] != nil {
mtime1 = grs[i].ObjInfo.ModTime
}
if grs[j] != nil {
mtime2 = grs[j].ObjInfo.ModTime
}
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(grs, less)

var found = -1
for i, err := range errs {
if err == nil {
Expand All @@ -627,6 +671,18 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
}

if found >= 0 {
if checkPrecondFn != nil && checkPrecondFn(grs[found].ObjInfo) {
for _, grr := range grs {
grr.Close()
}
return nil, PreConditionFailed{}
}
for i, grr := range grs {
if i == found {
continue
}
grr.Close()
}
return grs[found], nil
}

Expand Down Expand Up @@ -659,7 +715,6 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s

errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))

opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
var wg sync.WaitGroup
for i, pool := range z.serverPools {
Expand All @@ -671,6 +726,18 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
}
wg.Wait()

// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
mtime1 := objInfos[i].ModTime
mtime2 := objInfos[j].ModTime
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(objInfos, less)

var found = -1
for i, err := range errs {
if err == nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/object-api-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (

// Close - calls the cleanup actions in reverse order
func (g *GetObjectReader) Close() error {
if g == nil {
return nil
}
// sync.Once is used here to ensure that Close() is
// idempotent.
g.once.Do(func() {
Expand Down