Skip to content

Commit

Permalink
fix: simplify loading IAM users to avoid using regular ListObjects() (#…
Browse files Browse the repository at this point in the history
…13392)

- avoids relying in listQuorum from the underlying listObjects()
  and potentially missing entries if any.

- avoid the entire merging logic etc, listing raw set by set
  and loading whatever is found is cleaner when dealing with
  a large cluster for IAM metadata.
  • Loading branch information
harshavardhana committed Oct 12, 2021
1 parent 1e117b7 commit 13e41f2
Showing 1 changed file with 67 additions and 36 deletions.
103 changes: 67 additions & 36 deletions cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1603,51 +1603,82 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
return err
}

if opts.WalkVersions {
go func() {
defer close(results)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
defer close(results)

var marker, versionIDMarker string
for {
loi, err := z.ListObjectVersions(ctx, bucket, prefix, marker, versionIDMarker, "", 1000)
if err != nil {
break
}
for _, erasureSet := range z.serverPools {
var wg sync.WaitGroup
for _, set := range erasureSet.sets {
set := set
wg.Add(1)
go func() {
defer wg.Done()

for _, obj := range loi.Objects {
results <- obj
}
disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 {
cancel()
return
}

if !loi.IsTruncated {
break
}
loadEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}

marker = loi.NextMarker
versionIDMarker = loi.NextVersionIDMarker
}
}()
return nil
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
cancel()
return
}

go func() {
defer close(results)
for _, version := range fivs.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
}
}

var marker string
for {
loi, err := z.ListObjects(ctx, bucket, prefix, marker, "", 1000)
if err != nil {
break
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
}

for _, obj := range loi.Objects {
results <- obj
}
path := baseDirFromPrefix(prefix)
if path == "" {
path = prefix
}

if !loi.IsTruncated {
break
}
lopts := listPathRawOptions{
disks: disks,
bucket: bucket,
path: path,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: loadEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}

loadEntry(*entry)
},
finished: nil,
}

marker = loi.NextMarker
if err := listPathRaw(ctx, lopts); err != nil {
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
return
}
}()
}
wg.Wait()
}
}()

Expand Down

0 comments on commit 13e41f2

Please sign in to comment.