Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use internode data for DisksInfo, VolsInfo in message pack #10821

Merged
merged 1 commit into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 0 additions & 48 deletions cmd/erasure-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ type erasureSets struct {

disksStorageInfoCache timedValue

// Merge tree walk
pool *MergeWalkPool
poolSplunk *MergeWalkPool
poolVersions *MergeWalkVersionsPool

mrfMU sync.Mutex
mrfOperations map[healSource]int
}
Expand Down Expand Up @@ -356,9 +351,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
disksConnectEvent: make(chan diskConnectInfo),
distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID),
pool: NewMergeWalkPool(globalMergeLookupTimeout),
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout),
mrfOperations: make(map[healSource]int),
}

Expand Down Expand Up @@ -926,10 +918,6 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
return lentry, lexicallySortedEntryCount, isTruncated
}

func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false)
}

func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh {
return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
}
Expand Down Expand Up @@ -964,42 +952,6 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
return entryChs
}

// Starts a walk channel across n number of disks and returns a slice of
// FileInfoCh which can be read from.
func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int, splunk bool) []FileInfoCh {
var entryChs []FileInfoCh
var wg sync.WaitGroup
var mutex sync.Mutex
for _, set := range s.sets {
// Reset for the next erasure set.
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()

var entryCh chan FileInfo
var err error
if splunk {
entryCh, err = disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh)
} else {
entryCh, err = disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
}
if err != nil {
// Disk walk returned error, ignore it.
return
}
mutex.Lock()
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
mutex.Unlock()
}(disk)
}
}
wg.Wait()
return entryChs
}

func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
// In list multipart uploads we are going to treat input prefix as the object,
// this means that we are not supporting directory navigation.
Expand Down
File renamed without changes.
File renamed without changes.