Skip to content

Commit

Permalink
chore(repository): extracted content index building and parsing into …
Browse files Browse the repository at this point in the history
…repo/content/index (#1881)
  • Loading branch information
jkowalski committed Apr 6, 2022
1 parent 4548160 commit 5d87d81
Show file tree
Hide file tree
Showing 31 changed files with 431 additions and 367 deletions.
7 changes: 4 additions & 3 deletions cli/command_content_range_flags.go
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/alecthomas/kingpin"

"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
)

type contentRangeFlags struct {
Expand All @@ -20,12 +21,12 @@ func (c *contentRangeFlags) setup(cmd *kingpin.CmdClause) {

func (c *contentRangeFlags) contentIDRange() content.IDRange {
if c.contentIDPrefixed {
return content.AllPrefixedIDs
return index.AllPrefixedIDs
}

if c.contentIDNonPrefixed {
return content.AllNonPrefixedIDs
return index.AllNonPrefixedIDs
}

return content.PrefixRange(content.ID(c.contentIDPrefix))
return index.PrefixRange(content.ID(c.contentIDPrefix))
}
85 changes: 85 additions & 0 deletions internal/indextest/indextest.go
@@ -0,0 +1,85 @@
// Package indextest provides utilities for testing content index.
package indextest

import (
"fmt"
"reflect"
"strings"

"github.com/kopia/kopia/repo/content/index"
)

// InfoDiff returns a list of differences between two index.Info, empty if they are equal.
// nolint:gocyclo
func InfoDiff(i1, i2 index.Info, ignore ...string) []string {
var diffs []string

if l, r := i1.GetContentID(), i2.GetContentID(); l != r {
diffs = append(diffs, fmt.Sprintf("GetContentID %v != %v", l, r))
}

if l, r := i1.GetPackBlobID(), i2.GetPackBlobID(); l != r {
diffs = append(diffs, fmt.Sprintf("GetPackBlobID %v != %v", l, r))
}

if l, r := i1.GetDeleted(), i2.GetDeleted(); l != r {
diffs = append(diffs, fmt.Sprintf("GetDeleted %v != %v", l, r))
}

if l, r := i1.GetFormatVersion(), i2.GetFormatVersion(); l != r {
diffs = append(diffs, fmt.Sprintf("GetFormatVersion %v != %v", l, r))
}

if l, r := i1.GetOriginalLength(), i2.GetOriginalLength(); l != r {
diffs = append(diffs, fmt.Sprintf("GetOriginalLength %v != %v", l, r))
}

if l, r := i1.GetPackOffset(), i2.GetPackOffset(); l != r {
diffs = append(diffs, fmt.Sprintf("GetPackOffset %v != %v", l, r))
}

if l, r := i1.GetPackedLength(), i2.GetPackedLength(); l != r {
diffs = append(diffs, fmt.Sprintf("GetPackedLength %v != %v", l, r))
}

if l, r := i1.GetTimestampSeconds(), i2.GetTimestampSeconds(); l != r {
diffs = append(diffs, fmt.Sprintf("GetTimestampSeconds %v != %v", l, r))
}

if l, r := i1.Timestamp(), i2.Timestamp(); !l.Equal(r) {
diffs = append(diffs, fmt.Sprintf("Timestamp %v != %v", l, r))
}

if l, r := i1.GetCompressionHeaderID(), i2.GetCompressionHeaderID(); l != r {
diffs = append(diffs, fmt.Sprintf("GetCompressionHeaderID %v != %v", l, r))
}

if l, r := i1.GetEncryptionKeyID(), i2.GetEncryptionKeyID(); l != r {
diffs = append(diffs, fmt.Sprintf("GetEncryptionKeyID %v != %v", l, r))
}

// dear future reader, if this fails because the number of methods has changed,
// you need to add additional verification above.
// nolint:gomnd
if cnt := reflect.TypeOf((*index.Info)(nil)).Elem().NumMethod(); cnt != 11 {
diffs = append(diffs, fmt.Sprintf("unexpected number of methods on content.Info: %v, must update the test", cnt))
}

var result []string

for _, v := range diffs {
ignored := false

for _, ign := range ignore {
if strings.HasPrefix(v, ign) {
ignored = true
}
}

if !ignored {
result = append(result, v)
}
}

return result
}
30 changes: 16 additions & 14 deletions repo/content/committed_content_index.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/logging"
)

Expand All @@ -32,9 +33,9 @@ type committedContentIndex struct {
// +checklocks:mu
deletionWatermark time.Time
// +checklocks:mu
inUse map[blob.ID]packIndex
inUse map[blob.ID]index.Index
// +checklocks:mu
merged mergedIndex
merged index.Merged

v1PerContentOverhead uint32
indexVersion int
Expand All @@ -48,7 +49,7 @@ type committedContentIndex struct {
type committedContentIndexCache interface {
hasIndexBlobID(ctx context.Context, indexBlob blob.ID) (bool, error)
addContentToCache(ctx context.Context, indexBlob blob.ID, data gather.Bytes) error
openIndex(ctx context.Context, indexBlob blob.ID) (packIndex, error)
openIndex(ctx context.Context, indexBlob blob.ID) (index.Index, error)
expireUnused(ctx context.Context, used []blob.ID) error
}

Expand All @@ -73,7 +74,7 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
return nil, ErrContentNotFound
}

return nil, err
return nil, errors.Wrap(err, "error getting content info from index")
}

func shouldIgnore(id Info, deletionWatermark time.Time) bool {
Expand Down Expand Up @@ -123,10 +124,11 @@ func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID bl

func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) error {
c.mu.Lock()
m := append(mergedIndex(nil), c.merged...)
m := append(index.Merged(nil), c.merged...)
deletionWatermark := c.deletionWatermark
c.mu.Unlock()

// nolint:wrapcheck
return m.Iterate(r, func(i Info) error {
if shouldIgnore(i, deletionWatermark) {
return nil
Expand All @@ -151,8 +153,8 @@ func (c *committedContentIndex) indexFilesChanged(indexFiles []blob.ID) bool {
return false
}

func (c *committedContentIndex) merge(ctx context.Context, indexFiles []blob.ID) (merged mergedIndex, used map[blob.ID]packIndex, finalErr error) {
used = map[blob.ID]packIndex{}
func (c *committedContentIndex) merge(ctx context.Context, indexFiles []blob.ID) (merged index.Merged, used map[blob.ID]index.Index, finalErr error) {
used = map[blob.ID]index.Index{}

defer func() {
// we failed along the way, close the merged index.
Expand Down Expand Up @@ -214,8 +216,8 @@ func (c *committedContentIndex) use(ctx context.Context, indexFiles []blob.ID, i
return nil
}

func (c *committedContentIndex) combineSmallIndexes(m mergedIndex) (mergedIndex, error) {
var toKeep, toMerge mergedIndex
func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merged, error) {
var toKeep, toMerge index.Merged

for _, ndx := range m {
if ndx.ApproximateCount() < smallIndexEntryCountThreshold {
Expand All @@ -229,10 +231,10 @@ func (c *committedContentIndex) combineSmallIndexes(m mergedIndex) (mergedIndex,
return m, nil
}

b := packIndexBuilder{}
b := index.Builder{}

for _, ndx := range toMerge {
if err := ndx.Iterate(AllIDs, func(i Info) error {
if err := ndx.Iterate(index.AllIDs, func(i Info) error {
b.Add(i)
return nil
}); err != nil {
Expand All @@ -246,7 +248,7 @@ func (c *committedContentIndex) combineSmallIndexes(m mergedIndex) (mergedIndex,
return nil, errors.Wrap(err, "error building combined in-memory index")
}

combined, err := openPackIndex(bytes.NewReader(buf.Bytes()), c.v1PerContentOverhead)
combined, err := index.Open(bytes.NewReader(buf.Bytes()), c.v1PerContentOverhead)
if err != nil {
return nil, errors.Wrap(err, "error opening combined in-memory index")
}
Expand Down Expand Up @@ -342,14 +344,14 @@ func newCommittedContentIndex(caching *CachingOptions,
cache = &diskCommittedContentIndexCache{dirname, clock.Now, v1PerContentOverhead, log, minSweepAge}
} else {
cache = &memoryCommittedContentIndexCache{
contents: map[blob.ID]packIndex{},
contents: map[blob.ID]index.Index{},
v1PerContentOverhead: v1PerContentOverhead,
}
}

return &committedContentIndex{
cache: cache,
inUse: map[blob.ID]packIndex{},
inUse: map[blob.ID]index.Index{},
v1PerContentOverhead: v1PerContentOverhead,
indexVersion: indexVersion,
fetchOne: fetchOne,
Expand Down
13 changes: 7 additions & 6 deletions repo/content/committed_content_index_cache_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/logging"
)

Expand All @@ -27,7 +28,7 @@ func TestCommittedContentIndexCache_Memory(t *testing.T) {
t.Parallel()

testCache(t, &memoryCommittedContentIndexCache{
contents: map[blob.ID]packIndex{},
contents: map[blob.ID]index.Index{},
v1PerContentOverhead: 3,
}, nil)
}
Expand All @@ -47,7 +48,7 @@ func testCache(t *testing.T, cache committedContentIndexCache, fakeTime *faketim
t.Fatal("openIndex unexpectedly succeeded")
}

require.NoError(t, cache.addContentToCache(ctx, "ndx1", mustBuildPackIndex(t, packIndexBuilder{
require.NoError(t, cache.addContentToCache(ctx, "ndx1", mustBuildIndex(t, index.Builder{
"c1": &InfoStruct{PackBlobID: "p1234", ContentID: "c1"},
"c2": &InfoStruct{PackBlobID: "p1234", ContentID: "c2"},
})))
Expand All @@ -59,12 +60,12 @@ func testCache(t *testing.T, cache committedContentIndexCache, fakeTime *faketim
t.Fatal("hasIndexBlobID invalid response, expected true")
}

require.NoError(t, cache.addContentToCache(ctx, "ndx2", mustBuildPackIndex(t, packIndexBuilder{
require.NoError(t, cache.addContentToCache(ctx, "ndx2", mustBuildIndex(t, index.Builder{
"c3": &InfoStruct{PackBlobID: "p2345", ContentID: "c3"},
"c4": &InfoStruct{PackBlobID: "p2345", ContentID: "c4"},
})))

require.NoError(t, cache.addContentToCache(ctx, "ndx2", mustBuildPackIndex(t, packIndexBuilder{
require.NoError(t, cache.addContentToCache(ctx, "ndx2", mustBuildIndex(t, index.Builder{
"c3": &InfoStruct{PackBlobID: "p2345", ContentID: "c3"},
"c4": &InfoStruct{PackBlobID: "p2345", ContentID: "c4"},
})))
Expand Down Expand Up @@ -114,11 +115,11 @@ func testCache(t *testing.T, cache committedContentIndexCache, fakeTime *faketim
}
}

func mustBuildPackIndex(t *testing.T, b packIndexBuilder) gather.Bytes {
func mustBuildIndex(t *testing.T, b index.Builder) gather.Bytes {
t.Helper()

var buf bytes.Buffer
if err := b.Build(&buf, v2IndexVersion); err != nil {
if err := b.Build(&buf, index.Version2); err != nil {
t.Fatal(err)
}

Expand Down
11 changes: 9 additions & 2 deletions repo/content/committed_content_index_disk_cache.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/logging"
)

Expand All @@ -32,15 +33,21 @@ func (c *diskCommittedContentIndexCache) indexBlobPath(indexBlobID blob.ID) stri
return filepath.Join(c.dirname, string(indexBlobID)+simpleIndexSuffix)
}

func (c *diskCommittedContentIndexCache) openIndex(ctx context.Context, indexBlobID blob.ID) (packIndex, error) {
func (c *diskCommittedContentIndexCache) openIndex(ctx context.Context, indexBlobID blob.ID) (index.Index, error) {
fullpath := c.indexBlobPath(indexBlobID)

f, err := c.mmapOpenWithRetry(fullpath)
if err != nil {
return nil, err
}

return openPackIndex(f, c.v1PerContentOverhead)
ndx, err := index.Open(f, c.v1PerContentOverhead)
if err != nil {
f.Close() // nolint:errcheck
return nil, errors.Wrapf(err, "error openind index from %v", indexBlobID)
}

return ndx, nil
}

// mmapOpenWithRetry attempts mmap.Open() with exponential back-off to work around rare issue specific to Windows where
Expand Down
11 changes: 6 additions & 5 deletions repo/content/committed_content_index_mem_cache.go
Expand Up @@ -9,13 +9,14 @@ import (

"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
)

type memoryCommittedContentIndexCache struct {
mu sync.Mutex

// +checklocks:mu
contents map[blob.ID]packIndex
contents map[blob.ID]index.Index

v1PerContentOverhead uint32 // +checklocksignore
}
Expand All @@ -31,17 +32,17 @@ func (m *memoryCommittedContentIndexCache) addContentToCache(ctx context.Context
m.mu.Lock()
defer m.mu.Unlock()

ndx, err := openPackIndex(bytes.NewReader(data.ToByteSlice()), m.v1PerContentOverhead)
ndx, err := index.Open(bytes.NewReader(data.ToByteSlice()), m.v1PerContentOverhead)
if err != nil {
return err
return errors.Wrapf(err, "error opening index blob %v", indexBlobID)
}

m.contents[indexBlobID] = ndx

return nil
}

func (m *memoryCommittedContentIndexCache) openIndex(ctx context.Context, indexBlobID blob.ID) (packIndex, error) {
func (m *memoryCommittedContentIndexCache) openIndex(ctx context.Context, indexBlobID blob.ID) (index.Index, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -57,7 +58,7 @@ func (m *memoryCommittedContentIndexCache) expireUnused(ctx context.Context, use
m.mu.Lock()
defer m.mu.Unlock()

n := map[blob.ID]packIndex{}
n := map[blob.ID]index.Index{}

for _, u := range used {
if v, ok := m.contents[u]; ok {
Expand Down
3 changes: 2 additions & 1 deletion repo/content/committed_read_manager.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/kopia/kopia/repo/blob/filesystem"
"github.com/kopia/kopia/repo/blob/sharded"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/logging"
)
Expand Down Expand Up @@ -567,7 +568,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions
actualIndexVersion = legacyIndexVersion
}

if actualIndexVersion < v1IndexVersion || actualIndexVersion > v2IndexVersion {
if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 {
return nil, errors.Errorf("index version %v is not supported", actualIndexVersion)
}

Expand Down

0 comments on commit 5d87d81

Please sign in to comment.