Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read from branch with compaction data #7701

Merged
merged 29 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
deee833
Read from branch with compaction data
idanovo Apr 25, 2024
036a2b4
Remove unneeded test
idanovo Apr 25, 2024
82694ff
Fix comments
idanovo Apr 28, 2024
90059b0
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 1, 2024
c554d87
WIP
idanovo May 2, 2024
0170d65
WIP
idanovo May 2, 2024
5f44d2b
WIP
idanovo May 2, 2024
644a04d
WIP
idanovo May 3, 2024
18742fa
WIP
idanovo May 3, 2024
6cf1337
PR reviews
idanovo May 6, 2024
11a448f
WIP
idanovo May 6, 2024
dfb54e6
Fix review
idanovo May 6, 2024
f609ec2
Added tests
idanovo May 6, 2024
eb9c4e4
Added GC tests
idanovo May 7, 2024
d51ead2
PR review
idanovo May 7, 2024
3adc263
Fix
idanovo May 7, 2024
70c49d0
Fix tests
idanovo May 7, 2024
ca9a9cb
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 8, 2024
29e5085
WIP
idanovo May 8, 2024
c2cef8b
Added compaction to gc test
idanovo May 8, 2024
005f718
Rename function name
idanovo May 8, 2024
2717510
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 9, 2024
0400e6c
Review comments
idanovo May 9, 2024
23221ae
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 15, 2024
1c10a6e
Fix PR review
idanovo May 18, 2024
19d797c
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo May 18, 2024
2a0f495
Lint
idanovo May 19, 2024
4c46b26
Merge branch 'master' of https://github.com/treeverse/lakeFS into rea…
idanovo Jun 10, 2024
3426e0b
PR review
idanovo Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions pkg/catalog/catalog_test.go
idanovo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
numRecords int
expectedCalls int
expectedForUncommitted int
compactBranch bool
}{
{
name: "no branches",
Expand All @@ -546,23 +547,44 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
numRecords: 0,
expectedCalls: 1,
},
{
name: "no objects",
numBranch: 3,
numRecords: 0,
expectedCalls: 1,
compactBranch: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rather than double each test scenario, I would probably just loop over both scenarios by saying for _, compactBranch := range{false, true} { t.Run(...) } on l. 585.

},
{
name: "sanity",
numBranch: 5,
numRecords: 3,
expectedCalls: 1,
},
{
name: "compacted",
numBranch: 5,
numRecords: 3,
expectedCalls: 1,
compactBranch: true,
},
{
name: "tokenized",
numBranch: 500,
numRecords: 500,
expectedCalls: 2,
},
{
name: "tokenized and compacted",
numBranch: 500,
numRecords: 500,
expectedCalls: 2,
compactBranch: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
const repositoryID = "repo1"
g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls)
g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls, tt.compactBranch)
blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem)
c := &catalog.Catalog{
Store: g.Sut,
Expand Down Expand Up @@ -614,19 +636,35 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
}
}

func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int) (*gUtils.GravelerTest, []string) {
func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int, compact bool) (*gUtils.GravelerTest, []string) {
t.Helper()

test := gUtils.InitGravelerTest(t)
records := make([][]*graveler.ValueRecord, numBranches)
diffs := make([][]graveler.Diff, numBranches)
var branches []*graveler.BranchRecord
var expectedRecords []string
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
if numBranches > 0 {
idanovo marked this conversation as resolved.
Show resolved Hide resolved
test.RefManager.EXPECT().GetCommit(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(&graveler.Commit{}, nil)
test.CommittedManager.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator([]*graveler.ValueRecord{}), nil)
}
for i := 0; i < numBranches; i++ {
branchID := graveler.BranchID(fmt.Sprintf("branch%04d", i))
token := graveler.StagingToken(fmt.Sprintf("%s_st%04d", branchID, i))
branches = append(branches, &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}})
branch := &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}}
compactBaseMetaRangeID := graveler.MetaRangeID(fmt.Sprintf("base%04d", i))
commitID := graveler.CommitID(fmt.Sprintf("commit%04d", i))
if !compact {
test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token}, nil)
} else {
branch.CompactedBaseMetaRangeID = compactBaseMetaRangeID
branch.Branch.CommitID = commitID
test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token, CommitID: commitID, CompactedBaseMetaRangeID: compactBaseMetaRangeID}, nil)
}
branches = append(branches, branch)

records[i] = make([]*graveler.ValueRecord, 0, numRecords)
diffs[i] = make([]graveler.Diff, 0, numRecords)
for j := 0; j < numRecords; j++ {
var (
addressType catalog.Entry_AddressType
Expand Down Expand Up @@ -669,6 +707,11 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
Value: nil,
})

diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeRemoved,
Key: []byte(fmt.Sprintf("%s_tombstone", branchID)),
})

// Add external address
e := catalog.Entry{
Address: fmt.Sprintf("external/address/object_%s", branchID),
Expand All @@ -688,6 +731,15 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
Data: v,
},
})

diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
})
}

test.GarbageCollectionManager.EXPECT().NewID().Return("TestRunID")
Expand All @@ -708,6 +760,9 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
return bytes.Compare(records[i][ii].Key, records[i][jj].Key) < 0
})
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).AnyTimes().Return(cUtils.NewFakeValueIterator(records[i]))
if compact {
test.CommittedManager.EXPECT().Diff(gomock.Any(), repository.StorageNamespace, gomock.Any(), branches[i].CompactedBaseMetaRangeID).AnyTimes().Return(gUtils.NewDiffIter(diffs[i]), nil)
}
}

if numRecords > 0 {
Expand Down Expand Up @@ -736,7 +791,6 @@ func readPhysicalAddressesFromParquetObject(t *testing.T, repositoryID string, c
require.NoError(t, err)
bufferFile := buffer.NewBufferFileFromBytes(data)
defer func() { _ = bufferFile.Close() }()

pr, err := reader.NewParquetReader(bufferFile, new(catalog.UncommittedParquetObject), 4)
require.NoError(t, err)

Expand Down
8 changes: 0 additions & 8 deletions pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type FakeGraveler struct {
KeyValue map[string]*graveler.Value
Err error
ListIteratorFactory func() graveler.ValueIterator
ListStagingIteratorFactory func(token graveler.StagingToken) graveler.ValueIterator
DiffIteratorFactory func() graveler.DiffIterator
RepositoryIteratorFactory func() graveler.RepositoryIterator
BranchIteratorFactory func() graveler.BranchIterator
Expand Down Expand Up @@ -125,13 +124,6 @@ func (g *FakeGraveler) DeleteBatch(ctx context.Context, repository *graveler.Rep
return nil
}

func (g *FakeGraveler) ListStaging(_ context.Context, b *graveler.Branch, _ int) (graveler.ValueIterator, error) {
if g.Err != nil {
return nil, g.Err
}
return g.ListStagingIteratorFactory(b.StagingToken), nil
}

func (g *FakeGraveler) List(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.Ref, _ int) (graveler.ValueIterator, error) {
if g.Err != nil {
return nil, g.Err
Expand Down
109 changes: 75 additions & 34 deletions pkg/catalog/gc_write_uncommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,74 +17,115 @@ func gcWriteUncommitted(ctx context.Context, store Store, repository *graveler.R
}
pw.CompressionType = parquet.CompressionCodec_GZIP

// write uncommitted data from branches
it, err := NewUncommittedIterator(ctx, store, repository)
branchIterator, err := store.ListBranches(ctx, repository)
if err != nil {
return nil, false, err
}
defer it.Close()
defer branchIterator.Close()

if mark != nil {
it.SeekGE(mark.BranchID, mark.Path)
}
normalizedStorageNamespace := normalizeStorageNamespace(string(repository.StorageNamespace))

normalizedStorageNamespace := string(repository.StorageNamespace)
if !strings.HasSuffix(normalizedStorageNamespace, DefaultPathDelimiter) {
normalizedStorageNamespace += DefaultPathDelimiter
if mark != nil {
branchIterator.SeekGE(mark.BranchID)
}

count := 0
var nextMark *GCUncommittedMark
hasData := false
startTime := time.Now()
for branchIterator.Next() {
nextMark, count, err = processBranch(ctx, store, repository, branchIterator.Value().BranchID, runID, pw, normalizedStorageNamespace, maxFileSize, prepareDuration, w, count, mark, startTime)
if err != nil {
return nil, false, err
}
if nextMark != nil {
break
}
}
if branchIterator.Err() != nil {
return nil, false, branchIterator.Err()
}

if err := pw.WriteStop(); err != nil {
return nil, false, err
}

if count > 0 {
hasData = true
}
return nextMark, hasData, err
}

func normalizeStorageNamespace(namespace string) string {
if !strings.HasSuffix(namespace, DefaultPathDelimiter) {
namespace += DefaultPathDelimiter
}
return namespace
}
arielshaqed marked this conversation as resolved.
Show resolved Hide resolved

func processBranch(ctx context.Context, store Store, repository *graveler.RepositoryRecord, branchID graveler.BranchID, runID string, parquetWriter *writer.ParquetWriter, normalizedStorageNamespace string, maxFileSize int64, prepareDuration time.Duration, writer *UncommittedWriter, count int, mark *GCUncommittedMark, startTime time.Time) (*GCUncommittedMark, int, error) {
diffIterator, err := store.DiffUncommitted(ctx, repository, branchID)
if err != nil {
return nil, 0, err
}
defer diffIterator.Close()

var nextMark *GCUncommittedMark
for it.Next() {
entry := it.Value()
// Skip if entry is tombstone
if entry.Entry == nil {

if mark != nil && mark.BranchID == branchID && mark.Path != "" {
diffIterator.SeekGE(graveler.Key(mark.Path))
}

for diffIterator.Next() {
diff := diffIterator.Value()

// Skip tombstones
if diff.Type == graveler.DiffTypeRemoved {
continue
}
// Skip non-relative that address outside the storage namespace

entry, err := ValueToEntry(diff.Value)
if err != nil {
return nil, 0, err
}

// Skip non-relative addresses outside the storage namespace
entryAddress := entry.Address
if entry.Entry.AddressType != Entry_RELATIVE {
if entry.AddressType != Entry_RELATIVE {
if !strings.HasPrefix(entry.Address, normalizedStorageNamespace) {
continue
}
entryAddress = entryAddress[len(normalizedStorageNamespace):]
}

count += 1
count++
if count%gcPeriodicCheckSize == 0 {
if err := pw.Flush(true); err != nil {
return nil, false, err
if err := parquetWriter.Flush(true); err != nil {
return nil, 0, err
}
}

// check if we need to stop - based on max file size or prepare duration.
// prepare duration is optional, if 0 it will be ignored.
// prepare duration is used to stop the process in cases we scan a lot of data, and we want to stop
// so the api call will not time out.
if w.Size() > maxFileSize || (prepareDuration > 0 && time.Since(startTime) > prepareDuration) {
if writer.Size() > maxFileSize || (prepareDuration > 0 && time.Since(startTime) > prepareDuration) {
nextMark = &GCUncommittedMark{
RunID: runID,
BranchID: entry.branchID,
Path: entry.Path,
BranchID: branchID,
Path: Path(diff.Key.String()),
}
break
}
if err = pw.Write(UncommittedParquetObject{

err = parquetWriter.Write(UncommittedParquetObject{
PhysicalAddress: entryAddress,
CreationDate: entry.LastModified.AsTime().Unix(),
}); err != nil {
return nil, false, err
})
if err != nil {
return nil, 0, err
}
}
if err := it.Err(); err != nil {
return nil, false, err
}
// stop writer before we return
if err := pw.WriteStop(); err != nil {
return nil, false, err
}

// Finished reading all staging area - return marker to switch processing tracked physical addresses
hasData := count > 0
return nextMark, hasData, nil
return nextMark, count, diffIterator.Err()
}
Loading
Loading