Skip to content

Commit

Permalink
Merge pull request #610 from treeverse/bugfix/use-db-time
Browse files Browse the repository at this point in the history
Create repositories, branches, commits, and merges using time on DB
  • Loading branch information
arielshaqed committed Sep 14, 2020
2 parents aa21c37 + b994757 commit f387784
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 53 deletions.
9 changes: 0 additions & 9 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/treeverse/lakefs/catalog/params"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
Expand Down Expand Up @@ -181,7 +180,6 @@ type CacheConfig struct {
// cataloger main catalog implementation based on mvcc
type cataloger struct {
params.Catalog
clock clock.Clock
log logging.Logger
db db.Database
wg sync.WaitGroup
Expand All @@ -194,12 +192,6 @@ type cataloger struct {

type CatalogerOption func(*cataloger)

func WithClock(newClock clock.Clock) CatalogerOption {
return func(c *cataloger) {
c.clock = newClock
}
}

func WithCacheEnabled(b bool) CatalogerOption {
return func(c *cataloger) {
c.Cache.Enabled = b
Expand Down Expand Up @@ -247,7 +239,6 @@ func WithParams(p params.Catalog) CatalogerOption {

func NewCataloger(db db.Database, options ...CatalogerOption) Cataloger {
c := &cataloger{
clock: clock.New(),
log: logging.Default().WithField("service_name", "cataloger"),
db: db,
dedupCh: make(chan *dedupRequest, dedupChannelSize),
Expand Down
13 changes: 8 additions & 5 deletions catalog/cataloger_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/treeverse/lakefs/db"
Expand Down Expand Up @@ -57,11 +58,13 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
}

// insert commit record
creationDate := c.clock.Now()
_, err = tx.Exec(`INSERT INTO catalog_commits (branch_id,commit_id,committer,message,creation_date,metadata,merge_type,previous_commit_id)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)`,
branchID, commitID, committer, message, creationDate, metadata, RelationTypeNone, lastCommitID)
if err != nil {
var creationDate time.Time
if err = tx.Get(&creationDate,
`INSERT INTO catalog_commits (branch_id,commit_id,committer,message,creation_date,metadata,merge_type,previous_commit_id)
VALUES ($1,$2,$3,$4,transaction_timestamp(),$5,$6,$7)
RETURNING creation_date`,
branchID, commitID, committer, message, metadata, RelationTypeNone, lastCommitID,
); err != nil {
return nil, err
}
reference := MakeReference(branch, commitID)
Expand Down
26 changes: 20 additions & 6 deletions catalog/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (

"github.com/davecgh/go-spew/spew"

"github.com/benbjohnson/clock"
"github.com/treeverse/lakefs/testutil"
)

func timeDifference(a, b time.Time) time.Duration {
diff := a.Sub(b)
if diff < time.Duration(0) {
return -diff
}
return diff
}

func TestCataloger_Commit(t *testing.T) {
ctx := context.Background()
now := time.Now().Round(time.Minute)
fakeClock := clock.NewMock()
fakeClock.Set(now)
c := testCataloger(t, WithClock(fakeClock))
c := testCataloger(t)
defer func() { _ = c.Close() }()
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
meta := Metadata{"key1": "val1", "key2": "val2"}
Expand All @@ -34,6 +38,7 @@ func TestCataloger_Commit(t *testing.T) {
PhysicalAddress: fileAddr,
Size: int64(i) + 1,
Metadata: meta,
CreationDate: time.Now(),
}, CreateEntryParams{}); err != nil {
t.Fatal("create entry for testing", fileName, err)
}
Expand All @@ -59,7 +64,7 @@ func TestCataloger_Commit(t *testing.T) {
Reference: "~KJ8Wd1Rs96Z",
Committer: "tester",
Message: "Simple commit",
CreationDate: now,
CreationDate: time.Now(),
Metadata: meta,
Parents: []string{"~KJ8Wd1Rs96Y"},
},
Expand Down Expand Up @@ -92,11 +97,20 @@ func TestCataloger_Commit(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
now := time.Now()
got, err := c.Commit(ctx, tt.args.repository, tt.args.branch, tt.args.message, tt.args.committer, tt.args.metadata)
if (err != nil) != tt.wantErr {
t.Errorf("Commit() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != nil {
if timeDifference(got.CreationDate, now) > 10*time.Second {
t.Errorf("expected creation time %s, got very different %s", got.CreationDate, now)
}
if tt.want != nil {
got.CreationDate = tt.want.CreationDate
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Commit() got = %s, want = %s", spew.Sdump(got), spew.Sdump(tt.want))
}
Expand Down
25 changes: 12 additions & 13 deletions catalog/cataloger_create_branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"fmt"
"time"

"github.com/treeverse/lakefs/db"
)
Expand Down Expand Up @@ -51,23 +52,21 @@ func (c *cataloger) CreateBranch(ctx context.Context, repository, branch string,
return nil, fmt.Errorf("insert branch: %w", err)
}

// create initial commit
creationDate := c.clock.Now()

insertReturns := struct {
CommitID CommitID `db:"commit_id"`
MergeSourceCommit CommitID `db:"merge_source_commit"`
CommitID CommitID `db:"commit_id"`
MergeSourceCommit CommitID `db:"merge_source_commit"`
TransactionTimestamp time.Time `db:"transaction_timestamp"`
}{}
commitMsg := fmt.Sprintf(createBranchCommitMessageFormat, branch, sourceBranch)
err = tx.Get(&insertReturns, `INSERT INTO catalog_commits (branch_id,commit_id,previous_commit_id,committer,message,
creation_date,merge_source_branch,merge_type,lineage_commits,merge_source_commit)
VALUES ($1,nextval('catalog_commit_id_seq'),0,$2,$3,$4,$5,'from_parent',
(select (select max(commit_id) from catalog_commits where branch_id=$5)||
(select distinct on (branch_id) lineage_commits from catalog_commits
where branch_id=$5 and merge_type='from_parent' order by branch_id,commit_id desc))
,(select max(commit_id) from catalog_commits where branch_id=$5 ))
RETURNING commit_id,merge_source_commit`,
branchID, CatalogerCommitter, commitMsg, creationDate, sourceBranchID)
VALUES ($1,nextval('catalog_commit_id_seq'),0,$2,$3,transaction_timestamp(),$4,'from_parent',
(select (select max(commit_id) from catalog_commits where branch_id=$4) ||
(select distinct on (branch_id) lineage_commits from catalog_commits
where branch_id=$4 and merge_type='from_parent' order by branch_id,commit_id desc))
,(select max(commit_id) from catalog_commits where branch_id=$4 ))
RETURNING commit_id,merge_source_commit,transaction_timestamp()`,
branchID, CatalogerCommitter, commitMsg, sourceBranchID)
if err != nil {
return nil, fmt.Errorf("insert commit: %w", err)
}
Expand All @@ -77,7 +76,7 @@ func (c *cataloger) CreateBranch(ctx context.Context, repository, branch string,
commitLog := &CommitLog{
Committer: CatalogerCommitter,
Message: commitMsg,
CreationDate: creationDate,
CreationDate: insertReturns.TransactionTimestamp,
Reference: reference,
Parents: []string{parentReference},
}
Expand Down
7 changes: 3 additions & 4 deletions catalog/cataloger_create_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ func (c *cataloger) CreateRepository(ctx context.Context, repository string, sto
}

// create repository with ref to branch
creationDate := c.clock.Now()
if _, err := tx.Exec(`INSERT INTO catalog_repositories (id,name,storage_namespace,creation_date,default_branch)
VALUES ($1,$2,$3,$4,$5)`, repoID, repository, storageNamespace, creationDate, branchID); err != nil {
VALUES ($1,$2,$3,transaction_timestamp(),$4)`, repoID, repository, storageNamespace, branchID); err != nil {
return nil, fmt.Errorf("insert repository: %w", err)
}

Expand All @@ -51,8 +50,8 @@ func (c *cataloger) CreateRepository(ctx context.Context, repository string, sto

// create initial commit
_, err := tx.Exec(`INSERT INTO catalog_commits (branch_id,commit_id,committer,message,creation_date,previous_commit_id)
VALUES ($1,nextval('catalog_commit_id_seq'),$2,$3,$4,0)`,
branchID, CatalogerCommitter, createRepositoryCommitMessage, creationDate)
VALUES ($1,nextval('catalog_commit_id_seq'),$2,$3,transaction_timestamp(),0)`,
branchID, CatalogerCommitter, createRepositoryCommitMessage)
if err != nil {
return nil, fmt.Errorf("insert commit: %w", err)
}
Expand Down
25 changes: 13 additions & 12 deletions catalog/cataloger_get_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"

"github.com/davecgh/go-spew/spew"
"github.com/treeverse/lakefs/testutil"
)

func TestCataloger_GetCommit(t *testing.T) {
ctx := context.Background()
now := time.Now().Round(time.Minute)
mockClock := clock.NewMock()
mockClock.Set(now)
c := testCataloger(t, WithClock(mockClock))
c := testCataloger(t)
defer func() { _ = c.Close() }()

// test data
Expand Down Expand Up @@ -52,7 +47,7 @@ func TestCataloger_GetCommit(t *testing.T) {
Reference: "~KJ8Wd1Rs96Z",
Committer: "tester0",
Message: "Commit0",
CreationDate: now,
CreationDate: time.Now(),
Metadata: Metadata{"k0": "v0"},
Parents: []string{"~KJ8Wd1Rs96Y"},
},
Expand All @@ -65,7 +60,7 @@ func TestCataloger_GetCommit(t *testing.T) {
Reference: "~KJ8Wd1Rs96a",
Committer: "tester1",
Message: "Commit1",
CreationDate: now,
CreationDate: time.Now(),
Metadata: Metadata{"k1": "v1"},
Parents: []string{"~KJ8Wd1Rs96Z"},
},
Expand All @@ -92,11 +87,20 @@ func TestCataloger_GetCommit(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
now := time.Now()
got, err := c.GetCommit(ctx, repository, tt.reference)
if (err != nil) != tt.wantErr {
t.Errorf("GetCommit() error = %s, wantErr %t", err, tt.wantErr)
return
}
if got != nil {
if timeDifference(got.CreationDate, now) > 10*time.Second {
t.Errorf("expected creation time %s, got very different %s", got.CreationDate, now)
}
if tt.want != nil {
got.CreationDate = tt.want.CreationDate
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetCommit() got = %s, want %s", spew.Sdump(got), spew.Sdump(tt.want))
}
Expand All @@ -106,10 +110,7 @@ func TestCataloger_GetCommit(t *testing.T) {

func TestCataloger_GetMergeCommit(t *testing.T) {
ctx := context.Background()
now := time.Now().Round(time.Minute)
mockClock := clock.NewMock()
mockClock.Set(now)
c := testCataloger(t, WithClock(mockClock))
c := testCataloger(t)
defer func() { _ = c.Close() }()

repo := testCatalogerRepo(t, ctx, c, "repo", "master")
Expand Down
8 changes: 4 additions & 4 deletions catalog/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (c *cataloger) mergeFromParent(tx db.Tx, previousMaxCommitID, nextCommitID

_, err = tx.Exec(`INSERT INTO catalog_commits (branch_id, commit_id, previous_commit_id,committer, message, creation_date, metadata, merge_type, merge_source_branch, merge_source_commit,
lineage_commits)
VALUES ($1,$2,$3,$4,$5,$6,$7,'from_parent',$8,$9,string_to_array($10,',')::bigint[])`,
childID, nextCommitID, previousMaxCommitID, committer, msg, c.clock.Now(), metadata, parentID, parentLastCommitID, childNewLineage)
VALUES ($1,$2,$3,$4,$5,transaction_timestamp(),$6,'from_parent',$7,$8,string_to_array($9,',')::bigint[])`,
childID, nextCommitID, previousMaxCommitID, committer, msg, metadata, parentID, parentLastCommitID, childNewLineage)
if err != nil {
return err
}
Expand Down Expand Up @@ -209,8 +209,8 @@ func (c *cataloger) mergeFromChild(tx db.Tx, previousMaxCommitID, nextCommitID C
return err
}
_, err = tx.Exec(`INSERT INTO catalog_commits (branch_id,commit_id,previous_commit_id,committer,message,creation_date,metadata,merge_type,merge_source_branch,merge_source_commit)
VALUES ($1,$2,$3,$4,$5,$6,$7,'from_child',$8,$9)`,
parentID, nextCommitID, previousMaxCommitID, committer, msg, c.clock.Now(), metadata, childID, childLastCommitID)
VALUES ($1,$2,$3,$4,$5,transaction_timestamp(),$6,'from_child',$7,$8)`,
parentID, nextCommitID, previousMaxCommitID, committer, msg, metadata, childID, childLastCommitID)
return err
}

Expand Down

0 comments on commit f387784

Please sign in to comment.