Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read from branch with compaction data #7701

Merged
merged 29 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
deee833
Read from branch with compaction data
idanovo Apr 25, 2024
036a2b4
Remove unneeded test
idanovo Apr 25, 2024
82694ff
Fix comments
idanovo Apr 28, 2024
90059b0
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 1, 2024
c554d87
WIP
idanovo May 2, 2024
0170d65
WIP
idanovo May 2, 2024
5f44d2b
WIP
idanovo May 2, 2024
644a04d
WIP
idanovo May 3, 2024
18742fa
WIP
idanovo May 3, 2024
6cf1337
PR reviews
idanovo May 6, 2024
11a448f
WIP
idanovo May 6, 2024
dfb54e6
Fix review
idanovo May 6, 2024
f609ec2
Added tests
idanovo May 6, 2024
eb9c4e4
Added GC tests
idanovo May 7, 2024
d51ead2
PR review
idanovo May 7, 2024
3adc263
Fix
idanovo May 7, 2024
70c49d0
Fix tests
idanovo May 7, 2024
ca9a9cb
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 8, 2024
29e5085
WIP
idanovo May 8, 2024
c2cef8b
Added compaction to gc test
idanovo May 8, 2024
005f718
Rename function name
idanovo May 8, 2024
2717510
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 9, 2024
0400e6c
Review comments
idanovo May 9, 2024
23221ae
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 15, 2024
1c10a6e
Fix PR review
idanovo May 18, 2024
19d797c
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 18, 2024
2a0f495
Lint
idanovo May 19, 2024
4c46b26
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo Jun 10, 2024
3426e0b
PR review
idanovo Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 114 additions & 52 deletions pkg/catalog/catalog_test.go
idanovo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
numRecords int
expectedCalls int
expectedForUncommitted int
compactBranch bool
}{
{
name: "no branches",
Expand Down Expand Up @@ -560,73 +561,91 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
const repositoryID = "repo1"
g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls)
blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem)
c := &catalog.Catalog{
Store: g.Sut,
BlockAdapter: blockAdapter,
UGCPrepareMaxFileSize: 500 * 1024,
KVStore: g.KVStore,
}

var (
mark *catalog.GCUncommittedMark
runID string
allRecords []string
)
for {
result, err := c.PrepareGCUncommitted(ctx, repositoryID, mark)
require.NoError(t, err)

// keep or check run id match previous calls
if runID == "" {
runID = result.RunID
} else {
require.Equal(t, runID, result.RunID)
for _, compactBranch := range []bool{false, true} {
t.Run(tt.name, func(t *testing.T) {
const repositoryID = "repo1"
g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls, compactBranch)
blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem)
c := &catalog.Catalog{
Store: g.Sut,
BlockAdapter: blockAdapter,
UGCPrepareMaxFileSize: 500 * 1024,
KVStore: g.KVStore,
}

if tt.numRecords == 0 {
require.Equal(t, "", result.Location)
require.Equal(t, "", result.Filename)
} else {
// read parquet information if data was stored to location
objLocation, err := url.JoinPath(result.Location, result.Filename)
var (
mark *catalog.GCUncommittedMark
runID string
allRecords []string
)
for {
result, err := c.PrepareGCUncommitted(ctx, repositoryID, mark)
require.NoError(t, err)
addresses := readPhysicalAddressesFromParquetObject(t, repositoryID, ctx, c, objLocation)
allRecords = append(allRecords, addresses...)
}

mark = result.Mark
if mark == nil {
break
// keep or check run id match previous calls
if runID == "" {
runID = result.RunID
} else {
require.Equal(t, runID, result.RunID)
}

if tt.numRecords == 0 {
require.Equal(t, "", result.Location)
require.Equal(t, "", result.Filename)
} else {
// read parquet information if data was stored to location
objLocation, err := url.JoinPath(result.Location, result.Filename)
require.NoError(t, err)
addresses := readPhysicalAddressesFromParquetObject(t, repositoryID, ctx, c, objLocation)
allRecords = append(allRecords, addresses...)
}

mark = result.Mark
if mark == nil {
break
}
require.Equal(t, runID, result.Mark.RunID)
}
require.Equal(t, runID, result.Mark.RunID)
}

// match expected records found in parquet data
sort.Strings(allRecords)
if diff := deep.Equal(allRecords, expectedRecords); diff != nil {
t.Errorf("Found diff in expected records: %s", diff)
}
})
// match expected records found in parquet data
sort.Strings(allRecords)
if diff := deep.Equal(allRecords, expectedRecords); diff != nil {
t.Errorf("Found diff in expected records: %s", diff)
}
})
}
}
}

func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int) (*gUtils.GravelerTest, []string) {
func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int, compact bool) (*gUtils.GravelerTest, []string) {
t.Helper()

test := gUtils.InitGravelerTest(t)
records := make([][]*graveler.ValueRecord, numBranches)
diffs := make([][]graveler.Diff, numBranches)
var branches []*graveler.BranchRecord
var expectedRecords []string
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
if numBranches > 0 {
idanovo marked this conversation as resolved.
Show resolved Hide resolved
test.RefManager.EXPECT().GetCommit(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(&graveler.Commit{}, nil)
test.CommittedManager.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator([]*graveler.ValueRecord{}), nil)
}
for i := 0; i < numBranches; i++ {
branchID := graveler.BranchID(fmt.Sprintf("branch%04d", i))
token := graveler.StagingToken(fmt.Sprintf("%s_st%04d", branchID, i))
branches = append(branches, &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}})
branch := &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}}
compactBaseMetaRangeID := graveler.MetaRangeID(fmt.Sprintf("base%04d", i))
commitID := graveler.CommitID(fmt.Sprintf("commit%04d", i))
if !compact {
test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token}, nil)
} else {
branch.CompactedBaseMetaRangeID = compactBaseMetaRangeID
branch.Branch.CommitID = commitID
test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token, CommitID: commitID, CompactedBaseMetaRangeID: compactBaseMetaRangeID}, nil)
}
branches = append(branches, branch)

records[i] = make([]*graveler.ValueRecord, 0, numRecords)
diffs[i] = make([]graveler.Diff, 0, numRecords)
for j := 0; j < numRecords; j++ {
var (
addressType catalog.Entry_AddressType
Expand Down Expand Up @@ -661,6 +680,30 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
})
// we always keep the relative path - as the prepared uncommitted will trim the storage namespace
expectedRecords = append(expectedRecords, fmt.Sprintf("%s_record%04d", branchID, j))

if compact {
diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
}) // record in compaction and in staging so no need to add it again to expected records
e.Address = fmt.Sprintf("%s_%s", e.Address, "compacted")
v, err = proto.Marshal(&e)
require.NoError(t, err)
diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
})
// record in compaction but not in staging
expectedRecords = append(expectedRecords, fmt.Sprintf("%s_record%04d_%s", branchID, j, "compacted"))
idanovo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Add tombstone
Expand All @@ -669,6 +712,11 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
Value: nil,
})

diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeRemoved,
Key: []byte(fmt.Sprintf("%s_tombstone", branchID)),
})

// Add external address
e := catalog.Entry{
Address: fmt.Sprintf("external/address/object_%s", branchID),
Expand All @@ -688,6 +736,15 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
Data: v,
},
})

diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
})
}

test.GarbageCollectionManager.EXPECT().NewID().Return("TestRunID")
Expand All @@ -699,21 +756,27 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
DefaultBranchID: "main",
},
}
test.RefManager.EXPECT().GetRepository(gomock.Any(), graveler.RepositoryID(repositoryID)).Times(expectedCalls).Return(repository, nil)
test.RefManager.EXPECT().GetRepository(gomock.Any(), graveler.RepositoryID(repositoryID)).MinTimes(1).Return(repository, nil)

// expect tracked addresses does not list branches, so remove one and keep at least the first
test.RefManager.EXPECT().ListBranches(gomock.Any(), gomock.Any()).Times(expectedCalls).Return(gUtils.NewFakeBranchIterator(branches), nil)
test.RefManager.EXPECT().ListBranches(gomock.Any(), gomock.Any()).MinTimes(1).Return(gUtils.NewFakeBranchIterator(branches), nil)
for i := 0; i < len(branches); i++ {
sort.Slice(records[i], func(ii, jj int) bool {
return bytes.Compare(records[i][ii].Key, records[i][jj].Key) < 0
})
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).AnyTimes().Return(cUtils.NewFakeValueIterator(records[i]))
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator(records[i]))
if compact {
sort.Slice(diffs[i], func(ii, jj int) bool {
return bytes.Compare(diffs[i][ii].Key, diffs[i][jj].Key) < 0
})
test.CommittedManager.EXPECT().Diff(gomock.Any(), repository.StorageNamespace, gomock.Any(), branches[i].CompactedBaseMetaRangeID).MinTimes(1).Return(gUtils.NewDiffIter(diffs[i]), nil)
}
}

if numRecords > 0 {
test.GarbageCollectionManager.EXPECT().
GetUncommittedLocation(gomock.Any(), gomock.Any()).
Times(expectedCalls).
MinTimes(1).
DoAndReturn(func(runID string, sn graveler.StorageNamespace) (string, error) {
return fmt.Sprintf("%s/retention/gc/uncommitted/%s/uncommitted/", "_lakefs", runID), nil
})
Expand All @@ -736,7 +799,6 @@ func readPhysicalAddressesFromParquetObject(t *testing.T, repositoryID string, c
require.NoError(t, err)
bufferFile := buffer.NewBufferFileFromBytes(data)
defer func() { _ = bufferFile.Close() }()

pr, err := reader.NewParquetReader(bufferFile, new(catalog.UncommittedParquetObject), 4)
require.NoError(t, err)

Expand Down
8 changes: 0 additions & 8 deletions pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type FakeGraveler struct {
KeyValue map[string]*graveler.Value
Err error
ListIteratorFactory func() graveler.ValueIterator
ListStagingIteratorFactory func(token graveler.StagingToken) graveler.ValueIterator
DiffIteratorFactory func() graveler.DiffIterator
RepositoryIteratorFactory func() graveler.RepositoryIterator
BranchIteratorFactory func() graveler.BranchIterator
Expand Down Expand Up @@ -125,13 +124,6 @@ func (g *FakeGraveler) DeleteBatch(ctx context.Context, repository *graveler.Rep
return nil
}

func (g *FakeGraveler) ListStaging(_ context.Context, b *graveler.Branch, _ int) (graveler.ValueIterator, error) {
if g.Err != nil {
return nil, g.Err
}
return g.ListStagingIteratorFactory(b.StagingToken), nil
}

func (g *FakeGraveler) List(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.Ref, _ int) (graveler.ValueIterator, error) {
if g.Err != nil {
return nil, g.Err
Expand Down
Loading
Loading