Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post-{commit,merge} #857

Merged
merged 6 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ type Merger interface {
Merge(ctx context.Context, repository, leftBranch, rightBranch, committer, message string, metadata Metadata) (*MergeResult, error)
}

type Hookser interface {
Hooks() *CatalogerHooks
}

type ExportConfigurator interface {
GetExportConfigurationForBranch(repository string, branch string) (ExportConfiguration, error)
GetExportConfigurations() ([]ExportConfigurationForBranch, error)
Expand All @@ -172,6 +176,7 @@ type Cataloger interface {
MultipartUpdateCataloger
Differ
Merger
Hookser
ExportConfigurator
io.Closer
}
Expand All @@ -191,6 +196,28 @@ type CacheConfig struct {
Jitter time.Duration
}

// CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are
// called in a current transaction context; if they return an error the transaction is rolled
// back. Because these transactions are current, the hook can see the effect the operation only
// on the passed transaction.
type CatalogerHooks struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a branch filter by name/regex?
o.w. you're relying on the hook itself to filter by it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall not do this. If needed in >1 hooks, we could add a wrapper ("decorator") function that filters a hook implementation for regexp. The added complexity of doing it here would include special semantics for merges -- there are two branches here.
Finally, I'm not sure it will be useful. For exports it's not, because it is hard to precompute the regexp (changes transactionally with the DB) and easy to check directly on the DB.

// PostCommit hooks are called at the end of a commit.
PostCommit []func(ctx context.Context, tx db.Tx, commitLog *CommitLog) error

// PostMerge hooks are called at the end of a merge.
PostMerge []func(ctx context.Context, tx db.Tx, mergeResult *MergeResult) error
}

func (h *CatalogerHooks) AddPostCommit(f func(context.Context, db.Tx, *CommitLog) error) *CatalogerHooks {
h.PostCommit = append(h.PostCommit, f)
return h
}

func (h *CatalogerHooks) AddPostMerge(f func(context.Context, db.Tx, *MergeResult) error) *CatalogerHooks {
h.PostMerge = append(h.PostMerge, f)
return h
}

// cataloger main catalog implementation based on mvcc
type cataloger struct {
params.Catalog
Expand All @@ -202,6 +229,7 @@ type cataloger struct {
dedupReportEnabled bool
dedupReportCh chan *DedupReport
readEntryRequestChan chan *readRequest
hooks CatalogerHooks
}

type CatalogerOption func(*cataloger)
Expand Down Expand Up @@ -423,3 +451,7 @@ func (c *cataloger) dedupBatch(batch []*dedupRequest) {
}
}
}

func (c *cataloger) Hooks() *CatalogerHooks {
return &c.hooks
}
10 changes: 10 additions & 0 deletions catalog/cataloger_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
); err != nil {
return nil, err
}

reference := MakeReference(branch, commitID)
parentReference := MakeReference(branch, lastCommitID)
commitLog := &CommitLog{
Expand All @@ -74,6 +75,15 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
Reference: reference,
Parents: []string{parentReference},
}

for _, hook := range c.hooks.PostCommit {
err = hook(ctx, tx, commitLog)
if err != nil {
// Roll tx back if a hook failed
return nil, err
}
}

return commitLog, nil
}, c.txOpts(ctx)...)
if err != nil {
Expand Down
56 changes: 55 additions & 1 deletion catalog/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/davecgh/go-spew/spew"

"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/testutil"
)

Expand Down Expand Up @@ -283,8 +284,61 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) {
ent, err := c.GetEntry(ctx, repository, branchCommit.Reference, "file42", GetEntryParams{})
testutil.MustDo(t, "get entry from create branch commit - branch1", err)

checksumFile42 := testCreateEntryCalcChecksum("file42", "")
checksumFile42 := testCreateEntryCalcChecksum("file42", t.Name(), "")
if ent.Checksum != checksumFile42 {
t.Fatalf("get entry from branch commit checksum=%s, expected, %s", ent.Checksum, checksumFile42)
}
}

func TestCataloger_CommitHooks(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)

t.Run("commit hooks run and see commit", func(t *testing.T) {
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can take out of the repo creation out of both tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every other test does that too. We can refactor in a separate PR.

checksum := testCatalogerCreateEntry(t, ctx, c, repository, DefaultBranchName, "/file1", nil, "")
var logs [2][]*CommitLog
for i := 0; i < 2; i++ {
j := i
c.Hooks().AddPostCommit(func(_ context.Context, _ db.Tx, l *CommitLog) error {
logs[j] = append(logs[j], l)
return nil
})
}

commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", Metadata{"foo": "bar"})
if err != nil {
t.Fatalf("commit entry: %s", err)
}

for i, log := range logs {
if len(log) != 1 || log[0] != commitLog {
t.Errorf("hook %d: expected one commit %+v but got %+v", i, commitLog, log)
}
}

entry, err := c.GetEntry(ctx, repository, "master:HEAD", "/file1", GetEntryParams{})
if err != nil || entry.Path != "/file1" || entry.Checksum != checksum {
t.Errorf("expected /file1 committed, got %+v, %s", entry, err)
}
})

t.Run("commit hooks can block commit", func(t *testing.T) {
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
testCatalogerCreateEntry(t, ctx, c, repository, "master", "/file1", nil, "")
testingErr := fmt.Errorf("you know, for testing!")
c.Hooks().AddPostCommit(func(_ context.Context, _ db.Tx, _ *CommitLog) error {
return testingErr
})

commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", Metadata{"foo": "bar"})
if !errors.Is(err, testingErr) {
t.Errorf("expected commit to fail with %s but got %v, %s", testingErr, commitLog, err)
}

entry, err := c.GetEntry(ctx, repository, "master:HEAD", "/file1", GetEntryParams{})
if !errors.Is(err, db.ErrNotFound) {
t.Errorf("expected not to find /file1 because its commit rolled back, got %+v, %s", entry, err)
}
})
}
arielshaqed marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion catalog/cataloger_list_entries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func testListEntriesCreateEntries(t *testing.T, ctx context.Context, c Cataloger
for j := 0; j < numEntries; j += skip {
path := fmt.Sprintf("my_entry%03d", j)
seed := strconv.Itoa(i)
checksum := testCreateEntryCalcChecksum(path, seed)
checksum := testCreateEntryCalcChecksum(path, t.Name(), seed)
err := c.CreateEntry(ctx, repo, branch, Entry{Path: path, Checksum: checksum, PhysicalAddress: checksum, Size: int64(i)}, CreateEntryParams{})
if err != nil {
t.Fatalf("Failed to create entry %s on branch %s, repository %s: %s", path, branch, repo, err)
Expand Down
9 changes: 9 additions & 0 deletions catalog/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
return nil, err
}
mergeResult.Reference = MakeReference(rightBranch, commitID)

for _, hook := range c.hooks.PostMerge {
err = hook(ctx, tx, mergeResult)
if err != nil {
// Roll tx back if a hook failed
return nil, err
}
}

return nil, nil
}, c.txOpts(ctx)...)
return mergeResult, err
Expand Down
64 changes: 61 additions & 3 deletions catalog/cataloger_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package catalog
import (
"context"
"errors"
"fmt"
"strconv"
"testing"

"github.com/go-test/deep"

"github.com/davecgh/go-spew/spew"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/testutil"
)

Expand Down Expand Up @@ -86,8 +88,7 @@ func TestCataloger_Merge_FromParentNoChangesInChild(t *testing.T) {
// Difference{Type: DifferenceTypeRemoved, Path: "/file1"},
//}
//if !differences.Equal(expectedDifferences) {
// t.Fatalf("Merge differences = %s, expected %s", spew.Sdump(differences), spew.Sdump(expectedDifferences))
//}
// t.Fatalf("Merge differences = %s, expected %s", spew.Sdump(differences), spew.Sdump(expectedDifferences))}
}

func TestCataloger_Merge_FromParentConflicts(t *testing.T) {
Expand Down Expand Up @@ -1106,7 +1107,7 @@ func TestCataloger_MergeOverDeletedEntries(t *testing.T) {
testutil.MustDo(t, "merge master to b1", err)
ent, err := c.GetEntry(ctx, repository, "b1", "fileX", GetEntryParams{})
testutil.MustDo(t, "get entry again from b1", err)
expectedChecksum := testCreateEntryCalcChecksum("fileX", "master2")
expectedChecksum := testCreateEntryCalcChecksum("fileX", t.Name(), "master2")
if ent.Checksum != expectedChecksum {
t.Fatalf("Get file checksum after merge=%s, expected %s", ent.Checksum, expectedChecksum)
}
Expand Down Expand Up @@ -1200,3 +1201,60 @@ func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) {
t.Fatalf("Merge err=%s, expected none", err)
}
}

func TestCataloger_Merge_Hooks(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know what you think about the diff test code - if you think it is easier to read, update this one - max I can live with both cases or give you the code for this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will copy that code over here, you're correct that it's a bit nicer.

for _, success := range []bool{true, false} {
test := "pass"
if !success {
test = "fail"
}
testingErr := fmt.Errorf("you know, for testing!")
t.Run(test, func(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
var gotMergeResult *MergeResult
if success {
c.Hooks().AddPostMerge(
func(_ context.Context, _ db.Tx, mergeResult *MergeResult) error {
if gotMergeResult != nil {
return fmt.Errorf("success merge hook called twice, %+v then %+v", gotMergeResult, mergeResult)
}
gotMergeResult = mergeResult
return nil
})
} else {
c.Hooks().AddPostMerge(
func(_ context.Context, _ db.Tx, _ *MergeResult) error {
return testingErr
})
}

repository := testCatalogerRepo(t, ctx, c, "repo", "master")

// create branch based on master
testCatalogerBranch(t, ctx, c, repository, "branch1", "master")

// create file to merge
testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "/file1", nil, "")
_, err := c.Commit(ctx, repository, "branch1", "commit to master", "tester", nil)
testutil.MustDo(t, "commit to branch1", err)

res, err := c.Merge(ctx, repository, "master", "branch1", "tester", "", nil)
if success {
if err != nil {
t.Error("Merge from master to branch1 failed:", err)
}
testVerifyEntries(t, ctx, c, repository, "branch1", []testEntryInfo{
{Path: "/file1"},
})
if diffs := deep.Equal(res, gotMergeResult); diffs != nil {
t.Error("hook received unexpected merge result: ", diffs)
}
} else {
if !errors.Is(err, testingErr) {
t.Error("hook did not fail merge: ", err)
}
}
})
}
}
12 changes: 7 additions & 5 deletions catalog/cataloger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func testCatalogerBranch(t testing.TB, ctx context.Context, c Cataloger, reposit
}
}

func testCatalogerCreateEntry(t testing.TB, ctx context.Context, c Cataloger, repository, branch, path string, metadata Metadata, seed string) {
// testCatalogerCreateEntry creates a test entry on cataloger, returning a (fake) checksum based on the path, the test name, and a seed.
func testCatalogerCreateEntry(t testing.TB, ctx context.Context, c Cataloger, repository, branch, path string, metadata Metadata, seed string) string {
t.Helper()
checksum := testCreateEntryCalcChecksum(path, seed)
checksum := testCreateEntryCalcChecksum(path, t.Name(), seed)
var size int64
for i := range checksum {
size += int64(checksum[i])
Expand All @@ -70,6 +71,7 @@ func testCatalogerCreateEntry(t testing.TB, ctx context.Context, c Cataloger, re
if err != nil {
t.Fatalf("Failed to create entry %s on branch %s, repository %s: %s", path, branch, repository, err)
}
return checksum
}

func testCatalogerGetEntry(t testing.TB, ctx context.Context, c Cataloger, repository, reference, path string, expect bool) {
Expand All @@ -83,9 +85,9 @@ func testCatalogerGetEntry(t testing.TB, ctx context.Context, c Cataloger, repos
}
}

func testCreateEntryCalcChecksum(key string, seed string) string {
func testCreateEntryCalcChecksum(key string, testName string, seed string) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the plan was to pass the test name as seed if needed and call this function.
possibly calling this function again for verification.
there is no need to contact the test name when we have seed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that if any 2 tests use the same key and seed then they clash. It caused an actual problem when I copied calls around.

h := sha256.New()
_, _ = h.Write([]byte(seed))
_, _ = h.Write([]byte(testName + seed))
_, _ = h.Write([]byte(key))
checksum := hex.EncodeToString(h.Sum(nil))
return checksum
Expand All @@ -101,7 +103,7 @@ func testVerifyEntries(t testing.TB, ctx context.Context, c Cataloger, repositor
}
} else {
testutil.MustDo(t, fmt.Sprintf("Get entry=%s, repository=%s, reference=%s", entry.Path, repository, reference), err)
expectedAddr := testCreateEntryCalcChecksum(entry.Path, entry.Seed)
expectedAddr := testCreateEntryCalcChecksum(entry.Path, t.Name(), entry.Seed)
if ent.PhysicalAddress != expectedAddr {
t.Fatalf("Get entry %s, addr = %s, expected %s", entry.Path, ent.PhysicalAddress, expectedAddr)
}
Expand Down