diff --git a/cli/command_content_gc.go b/cli/command_content_gc.go index acb35f9818..f21897ce2f 100644 --- a/cli/command_content_gc.go +++ b/cli/command_content_gc.go @@ -2,20 +2,30 @@ package cli import ( "context" + "sync" "github.com/pkg/errors" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" ) var ( contentGarbageCollectCommand = contentCommands.Command("gc", "Garbage-collect unused blobs") contentGarbageCollectCommandDelete = contentGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String() + contentGarbageCollectParallel = contentGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Int() ) func runContentGarbageCollectCommand(ctx context.Context, rep *repo.Repository) error { - unused, err := rep.Content.FindUnreferencedBlobs(ctx) - if err != nil { + var mu sync.Mutex + var unused []blob.Metadata + + if err := rep.Content.IterateUnreferencedBlobs(ctx, *contentGarbageCollectParallel, func(bm blob.Metadata) error { + mu.Lock() + unused = append(unused, bm) + mu.Unlock() + return nil + }); err != nil { return errors.Wrap(err, "error looking for unreferenced blobs") } @@ -27,7 +37,6 @@ func runContentGarbageCollectCommand(ctx context.Context, rep *repo.Repository) if *contentGarbageCollectCommandDelete != "yes" { var totalBytes int64 for _, u := range unused { - printStderr("unused %v (%v bytes)\n", u.BlobID, u.Length) totalBytes += u.Length } printStderr("Would delete %v unused blobs (%v bytes), pass '--delete=yes' to actually delete.\n", len(unused), totalBytes) diff --git a/cli/command_content_list.go b/cli/command_content_list.go index 009cae59ca..a2630973f8 100644 --- a/cli/command_content_list.go +++ b/cli/command_content_list.go @@ -3,11 +3,11 @@ package cli import ( "context" "fmt" + "sync/atomic" "github.com/pkg/errors" "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" ) @@ -22,45 +22,45 @@ var ( ) func runContentListCommand(ctx context.Context, rep *repo.Repository) error { - var count int + var count int32 var totalSize int64 - uniquePacks := map[blob.ID]bool{} - err := rep.Content.IterateContents(content.ID(*contentListPrefix), *contentListIncludeDeleted || *contentListDeletedOnly, func(b content.Info) error { - if *contentListDeletedOnly && !b.Deleted { - return nil - } - totalSize += int64(b.Length) - count++ - if b.PackBlobID != "" { - uniquePacks[b.PackBlobID] = true - } - if *contentListLong { - optionalDeleted := "" - if b.Deleted { - optionalDeleted = " (deleted)" + err := rep.Content.IterateContents( + content.IterateOptions{ + Prefix: content.ID(*contentListPrefix), + IncludeDeleted: *contentListIncludeDeleted || *contentListDeletedOnly, + }, + func(b content.Info) error { + if *contentListDeletedOnly && !b.Deleted { + return nil + } + atomic.AddInt64(&totalSize, int64(b.Length)) + atomic.AddInt32(&count, 1) + if *contentListLong { + optionalDeleted := "" + if b.Deleted { + optionalDeleted = " (deleted)" + } + fmt.Printf("%v %v %v %v+%v%v\n", + b.ID, + formatTimestamp(b.Timestamp()), + b.PackBlobID, + b.PackOffset, + maybeHumanReadableBytes(*contentListHuman, int64(b.Length)), + optionalDeleted) + } else { + fmt.Printf("%v\n", b.ID) } - fmt.Printf("%v %v %v %v+%v%v\n", - b.ID, - formatTimestamp(b.Timestamp()), - b.PackBlobID, - b.PackOffset, - maybeHumanReadableBytes(*contentListHuman, int64(b.Length)), - optionalDeleted) - } else { - fmt.Printf("%v\n", b.ID) - } - return nil - }) + return nil + }) if err != nil { return errors.Wrap(err, "error iterating") } if *contentListSummary { - fmt.Printf("Total: %v contents, %v packs, %v total size\n", + fmt.Printf("Total: %v contents, %v total size\n", maybeHumanReadableCount(*contentListHuman, int64(count)), - maybeHumanReadableCount(*contentListHuman, int64(len(uniquePacks))), maybeHumanReadableBytes(*contentListHuman, totalSize)) } diff --git a/cli/command_content_rewrite.go b/cli/command_content_rewrite.go index 27b7b89228..617ac7710e 100644 --- a/cli/command_content_rewrite.go +++ b/cli/command_content_rewrite.go @@ -4,7 +4,6 @@ import ( "context" "strings" "sync" - "time" "github.com/pkg/errors" @@ -130,28 +129,26 @@ func findContentInfos(ctx context.Context, rep *repo.Repository, ch chan content } func findContentWithFormatVersion(rep *repo.Repository, ch chan contentInfoOrError, version int) { - _ = rep.Content.IterateContents("", true, func(b content.Info) error { - if int(b.FormatVersion) == version && strings.HasPrefix(string(b.PackBlobID), *contentRewritePackPrefix) { - ch <- contentInfoOrError{Info: b} - } - return nil - }) + _ = rep.Content.IterateContents( + content.IterateOptions{IncludeDeleted: true}, + func(b content.Info) error { + if int(b.FormatVersion) == version && strings.HasPrefix(string(b.PackBlobID), *contentRewritePackPrefix) { + ch <- contentInfoOrError{Info: b} + } + return nil + }) } func findContentInShortPacks(rep *repo.Repository, ch chan contentInfoOrError, threshold int64) { - t0 := time.Now() - contentIDs, err := rep.Content.FindContentInShortPacks(threshold) - log.Infof("content in short packs determined in %v", time.Since(t0)) - if err != nil { + if err := rep.Content.IterateContentInShortPacks(threshold, func(ci content.Info) error { + if ci.ID.HasPrefix() == *contentRewritePrefixed { + ch <- contentInfoOrError{Info: ci} + } + return nil + }); err != nil { ch <- contentInfoOrError{err: err} return } - - for _, b := range contentIDs { - if b.ID.HasPrefix() == *contentRewritePrefixed { - ch <- contentInfoOrError{Info: b} - } - } } func init() { diff --git a/cli/command_content_stats.go b/cli/command_content_stats.go index d7595949a6..f8660e0400 100644 --- a/cli/command_content_stats.go +++ b/cli/command_content_stats.go @@ -3,7 +3,6 @@ package cli import ( "context" "fmt" - "sort" "strconv" "github.com/kopia/kopia/internal/units" @@ -17,14 +16,6 @@ var ( ) func runContentStatsCommand(ctx context.Context, rep *repo.Repository) error { - contents, err := rep.Content.ListContentInfos("", true) - if err != nil { - return err - } - sort.Slice(contents, func(i, j int) bool { - return contents[i].Length < contents[j].Length - }) - var sizeThreshold uint32 = 10 countMap := map[uint32]int{} totalSizeOfContentsUnder := map[uint32]int64{} @@ -36,19 +27,21 @@ func runContentStatsCommand(ctx context.Context, rep *repo.Repository) error { } var totalSize int64 - for _, b := range contents { - totalSize += int64(b.Length) - for s := range countMap { - if b.Length < s { - countMap[s]++ - totalSizeOfContentsUnder[s] += int64(b.Length) + var count int64 + if err := rep.Content.IterateContents( + content.IterateOptions{}, + func(b content.Info) error { + totalSize += int64(b.Length) + count++ + for s := range countMap { + if b.Length < s { + countMap[s]++ + totalSizeOfContentsUnder[s] += int64(b.Length) + } } - } - } - - fmt.Printf("Content statistics\n") - if len(contents) == 0 { - return nil + return nil + }); err != nil { + return err } sizeToString := units.BytesStringBase10 @@ -56,31 +49,28 @@ func runContentStatsCommand(ctx context.Context, rep *repo.Repository) error { sizeToString = func(l int64) string { return strconv.FormatInt(l, 10) } } - fmt.Println("Size: ") - fmt.Println(" Total ", sizeToString(totalSize)) - fmt.Println(" Average ", sizeToString(totalSize/int64(len(contents)))) - fmt.Println(" 1st percentile ", sizeToString(percentileSize(1, contents))) - fmt.Println(" 5th percentile ", sizeToString(percentileSize(5, contents))) - fmt.Println(" 10th percentile ", sizeToString(percentileSize(10, contents))) - fmt.Println(" 50th percentile ", sizeToString(percentileSize(50, contents))) - fmt.Println(" 90th percentile ", sizeToString(percentileSize(90, contents))) - fmt.Println(" 95th percentile ", sizeToString(percentileSize(95, contents))) - fmt.Println(" 99th percentile ", sizeToString(percentileSize(99, contents))) + fmt.Println("Count:", count) + fmt.Println("Total:", sizeToString(totalSize)) + if count == 0 { + return nil + } + fmt.Println("Average:", sizeToString(totalSize/count)) - fmt.Println("Counts:") + fmt.Printf("Histogram:\n\n") + var lastSize uint32 for _, size := range sizeThresholds { - fmt.Printf(" %v contents with size <%v (total %v)\n", countMap[size], sizeToString(int64(size)), sizeToString(totalSizeOfContentsUnder[size])) + fmt.Printf("%9v between %v and %v (total %v)\n", + countMap[size]-countMap[lastSize], + sizeToString(int64(lastSize)), + sizeToString(int64(size)), + sizeToString(totalSizeOfContentsUnder[size]-totalSizeOfContentsUnder[lastSize]), + ) + lastSize = size } return nil } -func percentileSize(p int, contents []content.Info) int64 { - pos := p * len(contents) / 100 - - return int64(contents[pos].Length) -} - func init() { contentStatsCommand.Action(repositoryAction(runContentStatsCommand)) } diff --git a/cli/command_content_verify.go b/cli/command_content_verify.go index 28a0c306c3..1796bac8df 100644 --- a/cli/command_content_verify.go +++ b/cli/command_content_verify.go @@ -2,6 +2,7 @@ package cli import ( "context" + "sync/atomic" "github.com/pkg/errors" @@ -12,13 +13,14 @@ import ( var ( contentVerifyCommand = contentCommands.Command("verify", "Verify contents") - contentVerifyIDs = contentVerifyCommand.Arg("id", "IDs of blocks to show (or 'all')").Required().Strings() + contentVerifyIDs = contentVerifyCommand.Arg("id", "IDs of blocks to show (or 'all')").Required().Strings() + contentVerifyParallel = contentVerifyCommand.Flag("parallel", "Parallelism").Int() ) func runContentVerifyCommand(ctx context.Context, rep *repo.Repository) error { for _, contentID := range toContentIDs(*contentVerifyIDs) { if contentID == "all" { - return verifyAllBlocks(ctx, rep) + return verifyAllContents(ctx, rep) } if err := contentVerify(ctx, rep, contentID); err != nil { return err @@ -28,18 +30,20 @@ func runContentVerifyCommand(ctx context.Context, rep *repo.Repository) error { return nil } -func verifyAllBlocks(ctx context.Context, rep *repo.Repository) error { - contentIDs, err := rep.Content.ListContents("") +func verifyAllContents(ctx context.Context, rep *repo.Repository) error { + var errorCount int32 + err := rep.Content.IterateContents(content.IterateOptions{ + Parallel: *contentVerifyParallel, + }, func(ci content.Info) error { + if err := contentVerify(ctx, rep, ci.ID); err != nil { + atomic.AddInt32(&errorCount, 1) + } + return nil + }) if err != nil { - return errors.Wrap(err, "unable to list contents") + return errors.Wrap(err, "iterate contents") } - var errorCount int - for _, contentID := range contentIDs { - if err := contentVerify(ctx, rep, contentID); err != nil { - errorCount++ - } - } if errorCount == 0 { return nil } @@ -52,7 +56,6 @@ func contentVerify(ctx context.Context, r *repo.Repository, contentID content.ID log.Warningf("content %v is invalid: %v", contentID, err) return err } - log.Infof("content %v is ok", contentID) return nil } diff --git a/examples/upload_download/main.go b/examples/upload_download/main.go index d49da0e659..5f814e5fa5 100644 --- a/examples/upload_download/main.go +++ b/examples/upload_download/main.go @@ -9,6 +9,7 @@ import ( "os" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" ) func main() { @@ -29,12 +30,12 @@ func main() { uploadAndDownloadObjects(ctx, r) // Now list contents found in the repository. - cnts, err := r.Content.ListContents("") - if err != nil { + if err := r.Content.IterateContents( + content.IterateOptions{}, + func(ci content.Info) error { + log.Printf("found content %v", ci) + return nil + }); err != nil { log.Printf("err: %v", err) } - - for _, c := range cnts { - log.Printf("found content %v", c) - } } diff --git a/repo/blob/storage.go b/repo/blob/storage.go index 5c30122abe..a021117428 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -2,6 +2,7 @@ package blob import ( "context" + "sync" "time" "github.com/pkg/errors" @@ -71,6 +72,44 @@ func ListAllBlobs(ctx context.Context, st Storage, prefix ID) ([]Metadata, error return result, err } +// IterateAllPrefixesInParallel invokes the provided callback and returns the first error returned by the callback or nil. +func IterateAllPrefixesInParallel(ctx context.Context, parallelism int, st Storage, prefixes []ID, callback func(Metadata) error) error { + if len(prefixes) == 1 { + return st.ListBlobs(ctx, prefixes[0], callback) + } + + if parallelism <= 0 { + parallelism = 1 + } + + var wg sync.WaitGroup + semaphore := make(chan struct{}, parallelism) + errch := make(chan error, len(prefixes)) + for _, prefix := range prefixes { + wg.Add(1) + prefix := prefix + + // acquire semaphore + semaphore <- struct{}{} + go func() { + defer wg.Done() + defer func() { + <-semaphore // release semaphore + }() + + if err := st.ListBlobs(ctx, prefix, callback); err != nil { + errch <- err + } + }() + } + + wg.Wait() + close(errch) + + // return first error or nil + return <-errch +} + // ListAllBlobsConsistent lists all blobs with given name prefix in the provided storage until the results are // consistent. The results are consistent if the list result fetched twice is identical. This guarantees that while // the first scan was in progress, no new blob was added or removed. diff --git a/repo/content/builder.go b/repo/content/builder.go index 32119534fa..b055588318 100644 --- a/repo/content/builder.go +++ b/repo/content/builder.go @@ -21,7 +21,7 @@ func (b packIndexBuilder) clone() packIndexBuilder { } r := packIndexBuilder{} - for k, v := range r { + for k, v := range b { i2 := *v r[k] = &i2 } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index d365995e39..59adecfb13 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -1,4 +1,4 @@ -// Package content implements repository support content-addressable storage contents. +// Package content implements repository support for content-addressable storage. package content import ( @@ -568,57 +568,159 @@ func (bm *Manager) Close() { close(bm.closed) } +type IterateOptions struct { + Prefix ID + IncludeDeleted bool + Parallel int +} + +type IterateCallback func(Info) error +type cancelIterateFunc func() error + +func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (IterateCallback, cancelIterateFunc) { + if parallel <= 1 { + return originalCallback, func() error { return nil } + } + + workch := make(chan Info, parallel) + workererrch := make(chan error, 1) + var wg sync.WaitGroup + var once sync.Once + + lastWorkerError := func() error { + select { + case err := <-workererrch: + return err + default: + return nil + } + } + + cleanup := func() error { + once.Do(func() { + log.Infof("finishing parallel iteration") + close(workch) + wg.Wait() + log.Infof("finished parallel iteration") + }) + return lastWorkerError() + } + + callback := func(i Info) error { + workch <- i + return lastWorkerError() + } + + // start N workers, each fetching from the shared channel and invoking the provided callback. + // cleanup() must be called to for worker completion + for i := 0; i < parallel; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + for i := range workch { + if err := originalCallback(i); err != nil { + select { + case workererrch <- err: + default: + } + } + } + }() + } + + return callback, cleanup +} + // IterateContents invokes the provided callback for each content starting with a specified prefix // and possibly including deleted items. -func (bm *Manager) IterateContents(prefix ID, includeDeleted bool, callback func(i Info) error) error { +func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error { bm.lock() pibClone := bm.packIndexBuilder.clone() bm.unlock() - appendToResult := func(i Info) error { - if (i.Deleted && !includeDeleted) || !strings.HasPrefix(string(i.ID), string(prefix)) { - return nil + callback, cleanup := maybeParallelExecutor(opts.Parallel, callback) + defer cleanup() //nolint:errcheck + + invokeCallback := func(i Info) error { + if !opts.IncludeDeleted { + if ci, ok := pibClone[i.ID]; ok { + if ci.Deleted { + return nil + } + } else if i.Deleted { + return nil + } } - if ci, ok := pibClone[i.ID]; ok && ci.Deleted { + + if !strings.HasPrefix(string(i.ID), string(opts.Prefix)) { return nil } return callback(i) } - if len(pibClone) == 0 && includeDeleted && prefix == "" { + if len(pibClone) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { // fast path, invoke callback directly - appendToResult = callback + invokeCallback = callback } for _, bi := range pibClone { - _ = appendToResult(*bi) + _ = invokeCallback(*bi) } - return bm.committedContents.listContents(prefix, appendToResult) -} + if err := bm.committedContents.listContents(opts.Prefix, invokeCallback); err != nil { + return err + } -// ListContents returns IDs of contents matching given prefix. -func (bm *Manager) ListContents(prefix ID) ([]ID, error) { - var result []ID + return cleanup() +} - err := bm.IterateContents(prefix, false, func(i Info) error { - result = append(result, i.ID) - return nil - }) +type IteratePackOptions struct { + IncludePacksWithOnlyDeletedContent bool + IncludeContentInfos bool +} - return result, err +type PackInfo struct { + PackID blob.ID + ContentCount int + TotalSize int64 + ContentInfos []Info } -// ListContentInfos returns the metadata about contents with a given prefix and kind. -func (bm *Manager) ListContentInfos(prefix ID, includeDeleted bool) ([]Info, error) { - var result []Info +type IteratePacksCallback func(PackInfo) error - err := bm.IterateContents(prefix, includeDeleted, func(i Info) error { - result = append(result, i) - return nil - }) +// IteratePacks invokes the provided callback for all pack blobs. +func (bm *Manager) IteratePacks(options IteratePackOptions, callback IteratePacksCallback) error { + packUsage := map[blob.ID]*PackInfo{} - return result, err + if err := bm.IterateContents( + IterateOptions{ + IncludeDeleted: options.IncludePacksWithOnlyDeletedContent, + }, + func(ci Info) error { + pi := packUsage[ci.PackBlobID] + if pi == nil { + pi = &PackInfo{} + packUsage[ci.PackBlobID] = pi + } + pi.PackID = ci.PackBlobID + pi.ContentCount++ + pi.TotalSize += int64(ci.Length) + if options.IncludeContentInfos { + pi.ContentInfos = append(pi.ContentInfos, ci) + } + return nil + }); err != nil { + return errors.Wrap(err, "error iterating contents") + } + + for _, v := range packUsage { + if err := callback(*v); err != nil { + return err + } + } + + return nil } // Flush completes writing any pending packs and writes pack indexes to the underlyign storage. @@ -798,82 +900,83 @@ func (bm *Manager) ContentInfo(ctx context.Context, contentID ID) (Info, error) return Info{}, err } - if bi.Deleted { - log.Debugf("ContentInfo(%q) - deleted", contentID) - } else { - log.Debugf("ContentInfo(%q) - exists in %v", contentID, bi.PackBlobID) - } - return bi, err } -func (bm *Manager) FindContentInShortPacks(threshold int64) ([]Info, error) { - infos, err := bm.ListContentInfos("", true) - if err != nil { - return nil, errors.Wrap(err, "unable to list index blobs") - } - +// IterateContentInShortPacks invokes the provided callback for all contents that are stored in +// packs shorter than the given threshold. +func (bm *Manager) IterateContentInShortPacks(threshold int64, callback IterateCallback) error { if threshold <= 0 { threshold = int64(bm.maxPackSize) * 8 / 10 } - var contentInfos []Info - for _, v := range groupByPackBlob(infos) { - if v.totalSize < threshold { - contentInfos = append(contentInfos, v.contentInfos...) - } - } - - return contentInfos, nil -} - -// FindUnreferencedBlobs returns the list of unreferenced storage contents. -func (bm *Manager) FindUnreferencedBlobs(ctx context.Context) ([]blob.Metadata, error) { - infos, err := bm.ListContentInfos("", true) - if err != nil { - return nil, errors.Wrap(err, "unable to list index blobs") - } - - usedPackContents := groupByPackBlob(infos) - - var unused []blob.Metadata - for _, prefix := range PackBlobIDPrefixes { - err := bm.st.ListBlobs(ctx, prefix, func(bi blob.Metadata) error { - u := usedPackContents[bi.BlobID] - if u.contentCount > 0 { - log.Debugf("pack %v, in use by %v contents (%v total size)", bi.BlobID, u.contentCount, u.totalSize) + return bm.IteratePacks( + IteratePackOptions{ + IncludePacksWithOnlyDeletedContent: true, + IncludeContentInfos: true, + }, + func(pi PackInfo) error { + if pi.TotalSize >= threshold { return nil } - unused = append(unused, bi) + for _, ci := range pi.ContentInfos { + if err := callback(ci); err != nil { + return err + } + } return nil - }) - if err != nil { - return nil, errors.Wrap(err, "error listing storage contents") - } + }, + ) +} + +// FindUnreferencedBlobs returns the list of unreferenced storage blobs. +func (bm *Manager) IterateUnreferencedBlobs(ctx context.Context, parallellism int, callback func(blob.Metadata) error) error { + usedPacks := map[blob.ID]bool{} + + log.Infof("determining blobs in use") + // find packs in use + if err := bm.IteratePacks( + IteratePackOptions{ + IncludePacksWithOnlyDeletedContent: true, + }, + func(pi PackInfo) error { + if pi.ContentCount > 0 { + usedPacks[pi.PackID] = true + } + return nil + }); err != nil { + return errors.Wrap(err, "error iterating packs") } + log.Infof("found %v pack blobs in use", len(usedPacks)) - return unused, nil -} - -type packCounters struct { - contentCount int - totalSize int64 - contentInfos []Info -} + unusedCount := 0 + var prefixes []blob.ID -func groupByPackBlob(infos []Info) map[blob.ID]packCounters { - packUsage := map[blob.ID]packCounters{} + if parallellism <= len(PackBlobIDPrefixes) { + prefixes = append(prefixes, PackBlobIDPrefixes...) + } else { + // iterate {p,q}[0-9,a-f] + for _, prefix := range PackBlobIDPrefixes { + for hexDigit := 0; hexDigit < 16; hexDigit++ { + prefixes = append(prefixes, blob.ID(fmt.Sprintf("%v%x", prefix, hexDigit))) + } + } + } + if err := blob.IterateAllPrefixesInParallel(ctx, parallellism, bm.st, prefixes, + func(bm blob.Metadata) error { + if usedPacks[bm.BlobID] { + return nil + } - for _, ci := range infos { - pc := packUsage[ci.PackBlobID] - pc.contentCount++ - pc.totalSize += int64(ci.Length) - pc.contentInfos = append(pc.contentInfos, ci) - packUsage[ci.PackBlobID] = pc + unusedCount++ + return callback(bm) + }); err != nil { + return errors.Wrap(err, "error iterating blobs") } + log.Infof("found %v pack blobs not in use", unusedCount) - return packUsage + return nil } func (bm *Manager) getCacheForContentID(id ID) *contentCache { diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 850e3043d9..779102928e 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -13,6 +13,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -571,17 +572,130 @@ func TestDeleteAndRecreate(t *testing.T) { } } +func TestIterateContents(t *testing.T) { + ctx := context.Background() + data := blobtesting.DataMap{} + keyTime := map[blob.ID]time.Time{} + bm := newTestContentManager(data, keyTime, nil) + // flushed, non-deleted + contentID1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) + + // flushed, deleted + contentID2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100)) + bm.Flush(ctx) + + if err := bm.DeleteContent(contentID2); err != nil { + t.Errorf("error deleting content 2 %v", err) + } + + // pending, non-deleted + contentID3 := writeContentAndVerify(ctx, t, bm, seededRandomData(12, 100)) + + // pending, deleted - is completely discarded + contentID4 := writeContentAndVerify(ctx, t, bm, seededRandomData(13, 100)) + if err := bm.DeleteContent(contentID4); err != nil { + t.Errorf("error deleting content 4 %v", err) + } + t.Logf("contentID1: %v", contentID1) + t.Logf("contentID2: %v", contentID2) + t.Logf("contentID3: %v", contentID3) + t.Logf("contentID4: %v", contentID4) + + cases := []struct { + desc string + options IterateOptions + want map[ID]bool + }{ + { + desc: "default options", + options: IterateOptions{}, + want: map[ID]bool{contentID1: true, contentID3: true}, + }, + { + desc: "include deleted", + options: IterateOptions{IncludeDeleted: true}, + want: map[ID]bool{ + contentID1: true, + contentID2: true, + contentID3: true, + }, + }, + { + desc: "parallel", + options: IterateOptions{ + Parallel: 10, + }, + want: map[ID]bool{ + contentID1: true, + contentID3: true, + }, + }, + { + desc: "parallel, include deleted", + options: IterateOptions{ + Parallel: 10, + IncludeDeleted: true, + }, + want: map[ID]bool{ + contentID1: true, + contentID2: true, + contentID3: true, + }, + }, + { + desc: "prefix match", + options: IterateOptions{ + Prefix: contentID1, + }, + want: map[ID]bool{contentID1: true}, + }, + { + desc: "prefix, include deleted", + options: IterateOptions{ + Prefix: contentID2, + IncludeDeleted: true, + }, + want: map[ID]bool{ + contentID2: true, + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + var mu sync.Mutex + got := map[ID]bool{} + + err := bm.IterateContents(tc.options, func(ci Info) error { + mu.Lock() + got[ci.ID] = true + mu.Unlock() + return nil + }) + + if err != nil { + t.Errorf("error iterating: %v", err) + } + + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("invalid content IDs got: %v, want %v", got, tc.want) + } + }) + } +} + func TestFindUnreferencedBlobs(t *testing.T) { ctx := context.Background() data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} bm := newTestContentManager(data, keyTime, nil) - verifyUnreferencedStorageFilesCount(ctx, t, bm, 0) + verifyUnreferencedBlobsCount(ctx, t, bm, 0) contentID := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) if err := bm.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) } - verifyUnreferencedStorageFilesCount(ctx, t, bm, 0) + verifyUnreferencedBlobsCount(ctx, t, bm, 0) if err := bm.DeleteContent(contentID); err != nil { t.Errorf("error deleting content: %v", contentID) } @@ -590,18 +704,18 @@ func TestFindUnreferencedBlobs(t *testing.T) { } // content still present in first pack - verifyUnreferencedStorageFilesCount(ctx, t, bm, 0) + verifyUnreferencedBlobsCount(ctx, t, bm, 0) assertNoError(t, bm.RewriteContent(ctx, contentID)) if err := bm.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) } - verifyUnreferencedStorageFilesCount(ctx, t, bm, 1) + verifyUnreferencedBlobsCount(ctx, t, bm, 1) assertNoError(t, bm.RewriteContent(ctx, contentID)) if err := bm.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) } - verifyUnreferencedStorageFilesCount(ctx, t, bm, 2) + verifyUnreferencedBlobsCount(ctx, t, bm, 2) } func TestFindUnreferencedBlobs2(t *testing.T) { @@ -609,7 +723,7 @@ func TestFindUnreferencedBlobs2(t *testing.T) { data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} bm := newTestContentManager(data, keyTime, nil) - verifyUnreferencedStorageFilesCount(ctx, t, bm, 0) + verifyUnreferencedBlobsCount(ctx, t, bm, 0) contentID := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100)) dumpContents(t, bm, "after writing") @@ -617,7 +731,7 @@ func TestFindUnreferencedBlobs2(t *testing.T) { t.Errorf("flush error: %v", err) } dumpContents(t, bm, "after flush") - verifyUnreferencedStorageFilesCount(ctx, t, bm, 0) + verifyUnreferencedBlobsCount(ctx, t, bm, 0) if err := bm.DeleteContent(contentID); err != nil { t.Errorf("error deleting content: %v", contentID) } @@ -627,33 +741,38 @@ func TestFindUnreferencedBlobs2(t *testing.T) { } dumpContents(t, bm, "after flush") // content present in first pack, original pack is still referenced - verifyUnreferencedStorageFilesCount(ctx, t, bm, 0) + verifyUnreferencedBlobsCount(ctx, t, bm, 0) } func dumpContents(t *testing.T, bm *Manager, caption string) { t.Helper() - infos, err := bm.ListContentInfos("", true) - if err != nil { + count := 0 + log.Infof("finished dumping %v contents", caption) + if err := bm.IterateContents(IterateOptions{IncludeDeleted: true}, + func(ci Info) error { + log.Debugf(" ci[%v]=%#v", count, ci) + count++ + return nil + }); err != nil { t.Errorf("error listing contents: %v", err) return } - - log.Infof("**** dumping %v contents %v", len(infos), caption) - for i, bi := range infos { - log.Debugf(" bi[%v]=%#v", i, bi) - } - log.Infof("finished dumping %v contents", len(infos)) + log.Infof("finished dumping %v %v contents", count, caption) } -func verifyUnreferencedStorageFilesCount(ctx context.Context, t *testing.T, bm *Manager, want int) { +func verifyUnreferencedBlobsCount(ctx context.Context, t *testing.T, bm *Manager, want int) { t.Helper() - unref, err := bm.FindUnreferencedBlobs(ctx) + var unrefCount int32 + err := bm.IterateUnreferencedBlobs(ctx, 1, func(_ blob.Metadata) error { + atomic.AddInt32(&unrefCount, 1) + return nil + }) if err != nil { - t.Errorf("error in FindUnreferencedBlobs: %v", err) + t.Errorf("error in IterateUnreferencedBlobs: %v", err) } - log.Infof("got %v expecting %v", unref, want) - if got := len(unref); got != want { + log.Infof("got %v expecting %v", unrefCount, want) + if got := int(unrefCount); got != want { t.Errorf("invalid number of unreferenced contents: %v, wanted %v", got, want) } } diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index ed96ce388e..83dc2c49fa 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -30,7 +30,7 @@ type contentManager interface { GetContent(ctx context.Context, contentID content.ID) ([]byte, error) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) DeleteContent(contentID content.ID) error - ListContents(prefix content.ID) ([]content.ID, error) + IterateContents(content.IterateOptions, content.IterateCallback) error DisableIndexFlush() EnableIndexFlush() Flush(ctx context.Context) error @@ -277,17 +277,28 @@ func (m *Manager) Refresh(ctx context.Context) error { func (m *Manager) loadCommittedContentsLocked(ctx context.Context) error { log.Debugf("listing manifest contents") - for { - contents, err := m.b.ListContents(manifestContentPrefix) - if err != nil { - return errors.Wrap(err, "unable to list manifest contents") - } - m.committedEntries = map[ID]*manifestEntry{} - m.committedContentIDs = map[content.ID]bool{} + var ( + mu sync.Mutex + manifests map[content.ID]manifest + ) - log.Debugf("found %v manifest contents", len(contents)) - err = m.loadManifestContents(ctx, contents) + for { + manifests = map[content.ID]manifest{} + + err := m.b.IterateContents(content.IterateOptions{ + Prefix: manifestContentPrefix, + Parallel: 8, + }, func(ci content.Info) error { + man, err := m.loadManifestContent(ctx, ci.ID) + if err != nil { + return err + } + mu.Lock() + manifests[ci.ID] = man + mu.Unlock() + return nil + }) if err == nil { // success break @@ -299,6 +310,8 @@ func (m *Manager) loadCommittedContentsLocked(ctx context.Context) error { return errors.Wrap(err, "unable to load manifest contents") } + m.loadManifestContentsLocked(manifests) + if err := m.maybeCompactLocked(ctx); err != nil { return errors.Errorf("error auto-compacting contents") } @@ -306,16 +319,12 @@ func (m *Manager) loadCommittedContentsLocked(ctx context.Context) error { return nil } -func (m *Manager) loadManifestContents(ctx context.Context, contentIDs []content.ID) error { - t0 := time.Now() +func (m *Manager) loadManifestContentsLocked(manifests map[content.ID]manifest) { + m.committedEntries = map[ID]*manifestEntry{} + m.committedContentIDs = map[content.ID]bool{} - for _, b := range contentIDs { - m.committedContentIDs[b] = true - } - - manifests, err := m.loadContentsInParallel(ctx, contentIDs) - if err != nil { - return err + for contentID := range manifests { + m.committedContentIDs[contentID] = true } for _, man := range manifests { @@ -330,60 +339,6 @@ func (m *Manager) loadManifestContents(ctx context.Context, contentIDs []content delete(m.committedEntries, k) } } - - log.Debugf("finished loading manifest contents in %v.", time.Since(t0)) - - return nil -} - -func (m *Manager) loadContentsInParallel(ctx context.Context, contentIDs []content.ID) ([]manifest, error) { - errch := make(chan error, len(contentIDs)) - manifests := make(chan manifest, len(contentIDs)) - ch := make(chan content.ID, len(contentIDs)) - var wg sync.WaitGroup - - for i := 0; i < 8; i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - - for blk := range ch { - t1 := time.Now() - man, err := m.loadManifestContent(ctx, blk) - - if err != nil { - errch <- err - log.Debugf("manifest content %v failed to be loaded by worker %v in %v: %v.", blk, workerID, time.Since(t1), err) - } else { - log.Debugf("manifest content %v loaded by worker %v in %v.", blk, workerID, time.Since(t1)) - manifests <- man - } - } - }(i) - } - - // feed manifest content IDs for goroutines - for _, b := range contentIDs { - ch <- b - } - close(ch) - - // wait for workers to complete - wg.Wait() - close(errch) - close(manifests) - - // if there was any error, forward it - if err := <-errch; err != nil { - return nil, err - } - - var man []manifest - for m := range manifests { - man = append(man, m) - } - - return man, nil } func (m *Manager) loadManifestContent(ctx context.Context, contentID content.ID) (manifest, error) { diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index dc46dd6ed6..d3fd3861ee 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -96,11 +96,16 @@ func TestManifest(t *testing.T) { t.Errorf("can't compact: %v", err) } - blks, err := mgr.b.ListContents(manifestContentPrefix) - if err != nil { - t.Errorf("unable to list manifest blocks: %v", err) - } - if got, want := len(blks), 1; got != want { + foundContents := 0 + if err := mgr.b.IterateContents( + content.IterateOptions{Prefix: manifestContentPrefix}, + func(ci content.Info) error { + foundContents++ + return nil + }); err != nil { + t.Errorf("unable to list manifest content: %v", err) + } + if got, want := foundContents, 1; got != want { t.Errorf("unexpected number of blocks: %v, want %v", got, want) } diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index 5c0f3a8769..414e650ea0 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -240,28 +240,28 @@ func readKnownBlock(ctx context.Context, t *testing.T, r *repo.Repository) error } func listContents(ctx context.Context, t *testing.T, r *repo.Repository) error { - _, err := r.Content.ListContents("") - return err + return r.Content.IterateContents( + content.IterateOptions{}, + func(i content.Info) error { return nil }, + ) } func listAndReadAllContents(ctx context.Context, t *testing.T, r *repo.Repository) error { - contentIDs, err := r.Content.ListContents("") - if err != nil { - return err - } - - for _, cid := range contentIDs { - _, err := r.Content.GetContent(ctx, cid) - if err != nil { - if err == content.ErrContentNotFound && strings.HasPrefix(string(cid), "m") { - // this is ok, sometimes manifest manager will perform compaction and 'm' contents will be marked as deleted - continue + return r.Content.IterateContents( + content.IterateOptions{}, + func(ci content.Info) error { + cid := ci.ID + _, err := r.Content.GetContent(ctx, cid) + if err != nil { + if err == content.ErrContentNotFound && strings.HasPrefix(string(cid), "m") { + // this is ok, sometimes manifest manager will perform compaction and 'm' contents will be marked as deleted + return nil + } + return errors.Wrapf(err, "error reading content %v", cid) } - return errors.Wrapf(err, "error reading content %v", cid) - } - } - return nil + return nil + }) } func compact(ctx context.Context, t *testing.T, r *repo.Repository) error {