Skip to content

Commit

Permalink
Diff performance improvement by trim lineage (#921)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzahij committed Nov 16, 2020
1 parent 00d09ce commit 34185da
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 64 deletions.
5 changes: 5 additions & 0 deletions catalog/cataloger_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func TestCataloger_Merge_FromParentNoChangesInChild(t *testing.T) {
}); diff != nil {
t.Fatal("Merge Summary", diff)
}
// merge again - nothing should happen
res, err = c.Merge(ctx, repository, "master", "branch1", "tester", "", nil)
if err != ErrNoDifferenceWasFound {
t.Fatal("Merge() expected ErrNoDifferenceWasFound, got:", err)
}
// TODO(barak): enable test after diff between commits is supported
//differences, _, err := c.Diff(ctx, repository, commitLog.Parents[0], commitLog.Parents[1], -1, "")
//testutil.MustDo(t, "diff merge changes", err)
Expand Down
74 changes: 40 additions & 34 deletions catalog/db_branch_scanner.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package catalog

import (
"strconv"
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/treeverse/lakefs/db"
)
Expand All @@ -18,7 +15,8 @@ type DBBranchScanner struct {
tx db.Tx
branchID int64
commitID CommitID
commitsWhere string
minCommitID CommitID
commitsWhere sq.Sqlizer
buf []*DBScannerEntry
idx int
after string
Expand All @@ -27,60 +25,68 @@ type DBBranchScanner struct {
value *DBScannerEntry
}

func NewDBBranchScanner(tx db.Tx, branchID int64, commitID CommitID, opts DBScannerOptions) *DBBranchScanner {
func NewDBBranchScanner(tx db.Tx, branchID int64, commitID, minCommitID CommitID, opts DBScannerOptions) *DBBranchScanner {
s := &DBBranchScanner{
tx: tx,
branchID: branchID,
idx: 0,
commitID: commitID,
opts: opts,
after: opts.After,
tx: tx,
branchID: branchID,
idx: 0,
commitID: commitID,
minCommitID: minCommitID,
opts: opts,
after: opts.After,
}
if s.opts.BufferSize == 0 {
s.opts.BufferSize = DBScannerDefaultBufferSize
}
s.buf = make([]*DBScannerEntry, 0, s.opts.BufferSize)
commitsWhere, err := getRelevantCommitsCondition(tx, branchID, commitID)
s.err = err
s.commitsWhere = commitsWhere
s.commitsWhere, s.err = s.buildCommitsWherePart()
return s
}

func (s *DBBranchScanner) SetAdditionalWhere(part sq.Sqlizer) {
s.opts.AdditionalWhere = part
}

func getRelevantCommitsCondition(tx db.Tx, branchID int64, commitID CommitID) (string, error) {
var branchMaxCommitID CommitID
var commits []string
var commitsWhere string
if commitID == UncommittedID {
return "", nil
func (s *DBBranchScanner) buildCommitsWherePart() (sq.Sqlizer, error) {
if s.commitID == UncommittedID {
return nil, nil
}
if commitID == UncommittedID || commitID == CommittedID {
var branchMaxCommitID CommitID
if s.commitID == CommittedID {
branchMaxCommitID = MaxCommitID
} else {
branchMaxCommitID = commitID
branchMaxCommitID = s.commitID
}
// commit_id name is changed so that sorting will be performed on the numeric value, not the string value (where "10" is less than "2")
sql := "SELECT commit_id::text as str_commit_id FROM catalog_commits WHERE branch_id = $1 AND commit_id <= $2 ORDER BY commit_id limit $3"
err := tx.Select(&commits, sql, branchID, branchMaxCommitID, BranchScannerMaxCommitsInFilter+1)
var commits []int64
err := s.tx.Select(&commits,
`SELECT commit_id FROM catalog_commits WHERE branch_id = $1 AND commit_id BETWEEN $2 AND $3 ORDER BY commit_id LIMIT $4`,
s.branchID, s.minCommitID+1, branchMaxCommitID, BranchScannerMaxCommitsInFilter+1)
if err != nil {
return "", err
return nil, err
}
if commitID == UncommittedID {
commits = append(commits, strconv.FormatInt(int64(MaxCommitID), 10))
if s.commitID == UncommittedID {
commits = append(commits, int64(MaxCommitID))
}
if len(commits) == 0 {
commits = append(commits, "-1") // this will actually never happen, since each branch has an initial branch
// anyway - there is no commit id -1
// this will actually never happen, since each branch has an initial branch
commits = []int64{1}
}

var wherePart sq.Sqlizer
if len(commits) <= BranchScannerMaxCommitsInFilter {
commitsWhere = "min_commit in (" + strings.Join(commits, `,`) + ")"
} else {
commitsWhere = "min_commit BETWEEN 1 AND " + commits[len(commits)-1]
return sq.Or{
sq.Eq{"min_commit": commits},
sq.Eq{"max_commit": append(commits, int64(s.minCommitID))},
}, nil
}

upperCommitID := commits[len(commits)-1]
wherePart = sq.Or{
sq.Expr("min_commit BETWEEN ? AND ?", s.minCommitID+1, upperCommitID),
sq.Expr("max_commit BETWEEN ? AND ?", s.minCommitID, upperCommitID),
}
return commitsWhere, nil
return wherePart, nil
}

func (s *DBBranchScanner) Next() bool {
Expand Down Expand Up @@ -159,7 +165,7 @@ func (s *DBBranchScanner) buildQuery() sq.SelectBuilder {
if s.after != "" {
q = q.Where("path > ?", s.after)
}
if s.commitsWhere != "" {
if s.commitsWhere != nil {
q = q.Where(s.commitsWhere)
}
if len(s.opts.AdditionalFields) > 0 {
Expand Down
8 changes: 4 additions & 4 deletions catalog/db_branch_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDBBranchScanner(t *testing.T) {

t.Run("empty", func(t *testing.T) {
_, _ = conn.Transact(func(tx db.Tx) (interface{}, error) {
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, DBScannerOptions{BufferSize: 64})
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, 0, DBScannerOptions{BufferSize: 64})
firstNext := scanner.Next()
if firstNext {
t.Fatalf("first entry should be false")
Expand All @@ -54,7 +54,7 @@ func TestDBBranchScanner(t *testing.T) {

t.Run("additional_fields", func(t *testing.T) {
_, _ = conn.Transact(func(tx db.Tx) (interface{}, error) {
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, DBScannerOptions{
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, 0, DBScannerOptions{
AdditionalFields: []string{"checksum", "physical_address"},
})
testedSomething := false
Expand All @@ -81,7 +81,7 @@ func TestDBBranchScanner(t *testing.T) {
t.Run("additional_where", func(t *testing.T) {
_, _ = conn.Transact(func(tx db.Tx) (interface{}, error) {
p := fmt.Sprintf("Obj-%04d", numberOfObjects-5)
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, DBScannerOptions{
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, 0, DBScannerOptions{
AdditionalWhere: sq.Expr("path=?", p),
})
var ent *DBScannerEntry
Expand All @@ -106,7 +106,7 @@ func TestDBBranchScanner(t *testing.T) {
_, _ = conn.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := getBranchID(tx, repository, branchName, LockTypeNone)
testutil.MustDo(t, "get branch ID", err)
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, DBScannerOptions{BufferSize: bufSize})
scanner := NewDBBranchScanner(tx, branchID, UncommittedID, 0, DBScannerOptions{BufferSize: bufSize})
i := 0
for scanner.Next() {
o := scanner.Value()
Expand Down
29 changes: 17 additions & 12 deletions catalog/db_diff_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,38 @@ func (s *DiffScanner) diffFromParent(tx db.Tx, params doDiffParams, scannerOpts
}
err = tx.Get(&s.childLastFromParentCommitID, query, args...)
if err != nil {
return nil, fmt.Errorf("get child last commit failed: %w", err)
}
leftLineage, err := getLineage(tx, params.LeftBranchID, CommittedID)
if err != nil {
return nil, fmt.Errorf("get left branch lineage: %w", err)
return nil, fmt.Errorf("get child last commit: %w", err)
}
rightLineage, err := getLineage(tx, params.RightBranchID, UncommittedID)
if err != nil {
return nil, fmt.Errorf("get right branch lineage: %w", err)
}
leftLineage, err := getLineage(tx, params.LeftBranchID, CommittedID)
if err != nil {
return nil, fmt.Errorf("get left branch lineage: %w", err)
}
// If some ancestor branch commit id is the same for parent and child - then the parent does not need to read it
// so it is trimmed from the parent lineage
if len(rightLineage)-len(leftLineage) != 1 {
if len(rightLineage)-len(leftLineage) != 1 || len(rightLineage) == 0 {
return nil, ErrLineageCorrupted
}
minMinCommit := []CommitID{rightLineage[0].CommitID} // commit ID of parent, as known to child
for i := range leftLineage {
if leftLineage[i].CommitID == rightLineage[i+1].CommitID {
leftLineage = leftLineage[:i]
break
}
minMinCommit = append(minMinCommit, rightLineage[i+1].CommitID)
}

scannerOpts.Lineage = leftLineage
s.leftScanner = NewDBLineageScanner(tx, params.LeftBranchID, CommittedID, scannerOpts)
scannerOpts.Lineage = rightLineage
s.rightScanner = NewDBLineageScanner(tx, params.RightBranchID, UncommittedID, scannerOpts)
s.childLineage = rightLineage
rightOpts := scannerOpts
rightOpts.Lineage = rightLineage
s.rightScanner = NewDBLineageScanner(tx, params.RightBranchID, UncommittedID, rightOpts)

leftOpts := scannerOpts
leftOpts.Lineage = leftLineage
leftOpts.MinCommits = minMinCommit
s.leftScanner = NewDBLineageScanner(tx, params.LeftBranchID, CommittedID, leftOpts)
return s, nil
}

Expand All @@ -124,7 +129,7 @@ func (s *DiffScanner) diffFromChild(tx db.Tx, params doDiffParams, scannerOpts D
if err != nil {
return nil, err
}
s.leftScanner = NewDBBranchScanner(tx, params.LeftBranchID, CommittedID, scannerOpts.DBScannerOptions)
s.leftScanner = NewDBBranchScanner(tx, params.LeftBranchID, CommittedID, 1, scannerOpts.DBScannerOptions)
s.rightScanner = NewDBLineageScanner(tx, params.RightBranchID, UncommittedID, scannerOpts)
return s, nil
}
Expand Down
36 changes: 22 additions & 14 deletions catalog/db_lineage_scanner.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package catalog

import (
"errors"
"fmt"

sq "github.com/Masterminds/squirrel"
Expand All @@ -21,16 +22,20 @@ type DBLineageScanner struct {

type DBLineageScannerOptions struct {
DBScannerOptions
Lineage []lineageCommit
Lineage []lineageCommit
MinCommits []CommitID
}

var ErrMinCommitsMismatch = errors.New("MinCommits length mismatch")

func NewDBLineageScanner(tx db.Tx, branchID int64, commitID CommitID, opts DBLineageScannerOptions) *DBLineageScanner {
s := &DBLineageScanner{
tx: tx,
branchID: branchID,
commitID: commitID,
opts: opts,
}
s.buildScanners()
return s
}

Expand All @@ -47,9 +52,6 @@ func (s *DBLineageScanner) Next() bool {
if s.ended {
return false
}
if !s.ensureBranchScanners() {
return false
}

// select lowest entry based on path
var selectedEntry *DBScannerEntry
Expand Down Expand Up @@ -102,34 +104,40 @@ func (s *DBLineageScanner) ReadLineage() ([]lineageCommit, error) {
return getLineage(s.tx, s.branchID, s.commitID)
}

func (s *DBLineageScanner) ensureBranchScanners() bool {
var lineage []lineageCommit
func (s *DBLineageScanner) buildScanners() {
var err error
if s.scanners != nil {
return true
}
var lineage []lineageCommit
if s.opts.Lineage != nil {
lineage = s.opts.Lineage
} else {
lineage, err = s.ReadLineage()
}
if err != nil {
s.err = fmt.Errorf("getting lineage: %w", err)
return false
return
}
s.scanners = make([]*DBBranchScanner, len(lineage)+1)
s.scanners[0] = NewDBBranchScanner(s.tx, s.branchID, s.commitID, s.opts.DBScannerOptions)
// use min commits or allocate default
minCommits := s.opts.MinCommits
if minCommits == nil {
minCommits = make([]CommitID, len(s.scanners))
for i := 0; i < len(minCommits); i++ {
minCommits[i] = 1
}
} else if len(minCommits) != len(s.scanners) {
s.err = ErrMinCommitsMismatch
}
s.scanners[0] = NewDBBranchScanner(s.tx, s.branchID, s.commitID, minCommits[0], s.opts.DBScannerOptions)
for i, bl := range lineage {
s.scanners[i+1] = NewDBBranchScanner(s.tx, bl.BranchID, bl.CommitID, s.opts.DBScannerOptions)
s.scanners[i+1] = NewDBBranchScanner(s.tx, bl.BranchID, bl.CommitID, minCommits[i+1], s.opts.DBScannerOptions)
}
for _, branchScanner := range s.scanners {
if branchScanner.Next() {
continue
}
if err := branchScanner.Err(); err != nil {
s.err = fmt.Errorf("getting entry from branch ID %d: %w", branchScanner.branchID, err)
return false
return
}
}
return true
}

0 comments on commit 34185da

Please sign in to comment.