Skip to content

Commit

Permalink
removed content.Manager.ListContents*() API
Browse files Browse the repository at this point in the history
  • Loading branch information
jkowalski committed Jul 28, 2019
1 parent 6f5abf5 commit fdd06b8
Show file tree
Hide file tree
Showing 13 changed files with 531 additions and 310 deletions.
15 changes: 12 additions & 3 deletions cli/command_content_gc.go
Expand Up @@ -2,20 +2,30 @@ package cli

import (
"context"
"sync"

"github.com/pkg/errors"

"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
)

var (
contentGarbageCollectCommand = contentCommands.Command("gc", "Garbage-collect unused blobs")
contentGarbageCollectCommandDelete = contentGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String()
contentGarbageCollectParallel = contentGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Int()
)

func runContentGarbageCollectCommand(ctx context.Context, rep *repo.Repository) error {
unused, err := rep.Content.FindUnreferencedBlobs(ctx)
if err != nil {
var mu sync.Mutex
var unused []blob.Metadata

if err := rep.Content.IterateUnreferencedBlobs(ctx, *contentGarbageCollectParallel, func(bm blob.Metadata) error {
mu.Lock()
unused = append(unused, bm)
mu.Unlock()
return nil
}); err != nil {
return errors.Wrap(err, "error looking for unreferenced blobs")
}

Expand All @@ -27,7 +37,6 @@ func runContentGarbageCollectCommand(ctx context.Context, rep *repo.Repository)
if *contentGarbageCollectCommandDelete != "yes" {
var totalBytes int64
for _, u := range unused {
printStderr("unused %v (%v bytes)\n", u.BlobID, u.Length)
totalBytes += u.Length
}
printStderr("Would delete %v unused blobs (%v bytes), pass '--delete=yes' to actually delete.\n", len(unused), totalBytes)
Expand Down
60 changes: 30 additions & 30 deletions cli/command_content_list.go
Expand Up @@ -3,11 +3,11 @@ package cli
import (
"context"
"fmt"
"sync/atomic"

"github.com/pkg/errors"

"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
)

Expand All @@ -22,45 +22,45 @@ var (
)

func runContentListCommand(ctx context.Context, rep *repo.Repository) error {
var count int
var count int32
var totalSize int64
uniquePacks := map[blob.ID]bool{}
err := rep.Content.IterateContents(content.ID(*contentListPrefix), *contentListIncludeDeleted || *contentListDeletedOnly, func(b content.Info) error {
if *contentListDeletedOnly && !b.Deleted {
return nil
}
totalSize += int64(b.Length)
count++
if b.PackBlobID != "" {
uniquePacks[b.PackBlobID] = true
}
if *contentListLong {
optionalDeleted := ""
if b.Deleted {
optionalDeleted = " (deleted)"
err := rep.Content.IterateContents(
content.IterateOptions{
Prefix: content.ID(*contentListPrefix),
IncludeDeleted: *contentListIncludeDeleted || *contentListDeletedOnly,
},
func(b content.Info) error {
if *contentListDeletedOnly && !b.Deleted {
return nil
}
atomic.AddInt64(&totalSize, int64(b.Length))
atomic.AddInt32(&count, 1)
if *contentListLong {
optionalDeleted := ""
if b.Deleted {
optionalDeleted = " (deleted)"
}
fmt.Printf("%v %v %v %v+%v%v\n",
b.ID,
formatTimestamp(b.Timestamp()),
b.PackBlobID,
b.PackOffset,
maybeHumanReadableBytes(*contentListHuman, int64(b.Length)),
optionalDeleted)
} else {
fmt.Printf("%v\n", b.ID)
}
fmt.Printf("%v %v %v %v+%v%v\n",
b.ID,
formatTimestamp(b.Timestamp()),
b.PackBlobID,
b.PackOffset,
maybeHumanReadableBytes(*contentListHuman, int64(b.Length)),
optionalDeleted)
} else {
fmt.Printf("%v\n", b.ID)
}

return nil
})
return nil
})

if err != nil {
return errors.Wrap(err, "error iterating")
}

if *contentListSummary {
fmt.Printf("Total: %v contents, %v packs, %v total size\n",
fmt.Printf("Total: %v contents, %v total size\n",
maybeHumanReadableCount(*contentListHuman, int64(count)),
maybeHumanReadableCount(*contentListHuman, int64(len(uniquePacks))),
maybeHumanReadableBytes(*contentListHuman, totalSize))
}

Expand Down
31 changes: 14 additions & 17 deletions cli/command_content_rewrite.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"strings"
"sync"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -130,28 +129,26 @@ func findContentInfos(ctx context.Context, rep *repo.Repository, ch chan content
}

func findContentWithFormatVersion(rep *repo.Repository, ch chan contentInfoOrError, version int) {
_ = rep.Content.IterateContents("", true, func(b content.Info) error {
if int(b.FormatVersion) == version && strings.HasPrefix(string(b.PackBlobID), *contentRewritePackPrefix) {
ch <- contentInfoOrError{Info: b}
}
return nil
})
_ = rep.Content.IterateContents(
content.IterateOptions{IncludeDeleted: true},
func(b content.Info) error {
if int(b.FormatVersion) == version && strings.HasPrefix(string(b.PackBlobID), *contentRewritePackPrefix) {
ch <- contentInfoOrError{Info: b}
}
return nil
})
}

func findContentInShortPacks(rep *repo.Repository, ch chan contentInfoOrError, threshold int64) {
t0 := time.Now()
contentIDs, err := rep.Content.FindContentInShortPacks(threshold)
log.Infof("content in short packs determined in %v", time.Since(t0))
if err != nil {
if err := rep.Content.IterateContentInShortPacks(threshold, func(ci content.Info) error {
if ci.ID.HasPrefix() == *contentRewritePrefixed {
ch <- contentInfoOrError{Info: ci}
}
return nil
}); err != nil {
ch <- contentInfoOrError{err: err}
return
}

for _, b := range contentIDs {
if b.ID.HasPrefix() == *contentRewritePrefixed {
ch <- contentInfoOrError{Info: b}
}
}
}

func init() {
Expand Down
68 changes: 29 additions & 39 deletions cli/command_content_stats.go
Expand Up @@ -3,7 +3,6 @@ package cli
import (
"context"
"fmt"
"sort"
"strconv"

"github.com/kopia/kopia/internal/units"
Expand All @@ -17,14 +16,6 @@ var (
)

func runContentStatsCommand(ctx context.Context, rep *repo.Repository) error {
contents, err := rep.Content.ListContentInfos("", true)
if err != nil {
return err
}
sort.Slice(contents, func(i, j int) bool {
return contents[i].Length < contents[j].Length
})

var sizeThreshold uint32 = 10
countMap := map[uint32]int{}
totalSizeOfContentsUnder := map[uint32]int64{}
Expand All @@ -36,51 +27,50 @@ func runContentStatsCommand(ctx context.Context, rep *repo.Repository) error {
}

var totalSize int64
for _, b := range contents {
totalSize += int64(b.Length)
for s := range countMap {
if b.Length < s {
countMap[s]++
totalSizeOfContentsUnder[s] += int64(b.Length)
var count int64
if err := rep.Content.IterateContents(
content.IterateOptions{},
func(b content.Info) error {
totalSize += int64(b.Length)
count++
for s := range countMap {
if b.Length < s {
countMap[s]++
totalSizeOfContentsUnder[s] += int64(b.Length)
}
}
}
}

fmt.Printf("Content statistics\n")
if len(contents) == 0 {
return nil
return nil
}); err != nil {
return err
}

sizeToString := units.BytesStringBase10
if *contentStatsRaw {
sizeToString = func(l int64) string { return strconv.FormatInt(l, 10) }
}

fmt.Println("Size: ")
fmt.Println(" Total ", sizeToString(totalSize))
fmt.Println(" Average ", sizeToString(totalSize/int64(len(contents))))
fmt.Println(" 1st percentile ", sizeToString(percentileSize(1, contents)))
fmt.Println(" 5th percentile ", sizeToString(percentileSize(5, contents)))
fmt.Println(" 10th percentile ", sizeToString(percentileSize(10, contents)))
fmt.Println(" 50th percentile ", sizeToString(percentileSize(50, contents)))
fmt.Println(" 90th percentile ", sizeToString(percentileSize(90, contents)))
fmt.Println(" 95th percentile ", sizeToString(percentileSize(95, contents)))
fmt.Println(" 99th percentile ", sizeToString(percentileSize(99, contents)))
fmt.Println("Count:", count)
fmt.Println("Total:", sizeToString(totalSize))
if count == 0 {
return nil
}
fmt.Println("Average:", sizeToString(totalSize/count))

fmt.Println("Counts:")
fmt.Printf("Histogram:\n\n")
var lastSize uint32
for _, size := range sizeThresholds {
fmt.Printf(" %v contents with size <%v (total %v)\n", countMap[size], sizeToString(int64(size)), sizeToString(totalSizeOfContentsUnder[size]))
fmt.Printf("%9v between %v and %v (total %v)\n",
countMap[size]-countMap[lastSize],
sizeToString(int64(lastSize)),
sizeToString(int64(size)),
sizeToString(totalSizeOfContentsUnder[size]-totalSizeOfContentsUnder[lastSize]),
)
lastSize = size
}

return nil
}

func percentileSize(p int, contents []content.Info) int64 {
pos := p * len(contents) / 100

return int64(contents[pos].Length)
}

func init() {
contentStatsCommand.Action(repositoryAction(runContentStatsCommand))
}
27 changes: 15 additions & 12 deletions cli/command_content_verify.go
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"context"
"sync/atomic"

"github.com/pkg/errors"

Expand All @@ -12,13 +13,14 @@ import (
var (
contentVerifyCommand = contentCommands.Command("verify", "Verify contents")

contentVerifyIDs = contentVerifyCommand.Arg("id", "IDs of blocks to show (or 'all')").Required().Strings()
contentVerifyIDs = contentVerifyCommand.Arg("id", "IDs of blocks to show (or 'all')").Required().Strings()
contentVerifyParallel = contentVerifyCommand.Flag("parallel", "Parallelism").Int()
)

func runContentVerifyCommand(ctx context.Context, rep *repo.Repository) error {
for _, contentID := range toContentIDs(*contentVerifyIDs) {
if contentID == "all" {
return verifyAllBlocks(ctx, rep)
return verifyAllContents(ctx, rep)
}
if err := contentVerify(ctx, rep, contentID); err != nil {
return err
Expand All @@ -28,18 +30,20 @@ func runContentVerifyCommand(ctx context.Context, rep *repo.Repository) error {
return nil
}

func verifyAllBlocks(ctx context.Context, rep *repo.Repository) error {
contentIDs, err := rep.Content.ListContents("")
func verifyAllContents(ctx context.Context, rep *repo.Repository) error {
var errorCount int32
err := rep.Content.IterateContents(content.IterateOptions{
Parallel: *contentVerifyParallel,
}, func(ci content.Info) error {
if err := contentVerify(ctx, rep, ci.ID); err != nil {
atomic.AddInt32(&errorCount, 1)
}
return nil
})
if err != nil {
return errors.Wrap(err, "unable to list contents")
return errors.Wrap(err, "iterate contents")
}

var errorCount int
for _, contentID := range contentIDs {
if err := contentVerify(ctx, rep, contentID); err != nil {
errorCount++
}
}
if errorCount == 0 {
return nil
}
Expand All @@ -52,7 +56,6 @@ func contentVerify(ctx context.Context, r *repo.Repository, contentID content.ID
log.Warningf("content %v is invalid: %v", contentID, err)
return err
}

log.Infof("content %v is ok", contentID)
return nil
}
Expand Down
13 changes: 7 additions & 6 deletions examples/upload_download/main.go
Expand Up @@ -9,6 +9,7 @@ import (
"os"

"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
)

func main() {
Expand All @@ -29,12 +30,12 @@ func main() {
uploadAndDownloadObjects(ctx, r)

// Now list contents found in the repository.
cnts, err := r.Content.ListContents("")
if err != nil {
if err := r.Content.IterateContents(
content.IterateOptions{},
func(ci content.Info) error {
log.Printf("found content %v", ci)
return nil
}); err != nil {
log.Printf("err: %v", err)
}

for _, c := range cnts {
log.Printf("found content %v", c)
}
}

0 comments on commit fdd06b8

Please sign in to comment.