Skip to content

Commit

Permalink
lakefs new repositories with import branch as parent branch by default (
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Oct 15, 2020
1 parent 1270083 commit 4a51df8
Show file tree
Hide file tree
Showing 36 changed files with 309 additions and 625 deletions.
80 changes: 1 addition & 79 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/treeverse/lakefs/dedup"
"github.com/treeverse/lakefs/httputil"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/onboard"
"github.com/treeverse/lakefs/permissions"
"github.com/treeverse/lakefs/retention"
"github.com/treeverse/lakefs/stats"
Expand Down Expand Up @@ -161,7 +160,6 @@ func (c *Controller) Configure(api *operations.LakefsAPI) {
api.RepositoriesGetRepositoryHandler = c.GetRepoHandler()
api.RepositoriesCreateRepositoryHandler = c.CreateRepositoryHandler()
api.RepositoriesDeleteRepositoryHandler = c.DeleteRepositoryHandler()
api.RepositoriesImportFromS3InventoryHandler = c.ImportFromS3InventoryHandler()

api.BranchesListBranchesHandler = c.ListBranchesHandler()
api.BranchesGetBranchHandler = c.GetBranchHandler()
Expand Down Expand Up @@ -545,7 +543,7 @@ func (c *Controller) CreateRepositoryHandler() repositories.CreateRepositoryHand
return repositories.NewCreateRepositoryBadRequest().
WithPayload(responseError("error creating repository: could not access storage namespace"))
}
err = deps.Cataloger.CreateRepository(c.Context(),
repo, err := deps.Cataloger.CreateRepository(c.Context(),
swag.StringValue(params.Repository.ID),
swag.StringValue(params.Repository.StorageNamespace),
params.Repository.DefaultBranch)
Expand All @@ -554,12 +552,6 @@ func (c *Controller) CreateRepositoryHandler() repositories.CreateRepositoryHand
WithPayload(responseError(fmt.Sprintf("error creating repository: %s", err)))
}

repo, err := deps.Cataloger.GetRepository(c.Context(), swag.StringValue(params.Repository.ID))
if err != nil {
return repositories.NewGetRepositoryDefault(http.StatusInternalServerError).
WithPayload(responseError(fmt.Sprintf("error creating repository: %s", err)))
}

return repositories.NewCreateRepositoryCreated().WithPayload(&models.Repository{
StorageNamespace: repo.StorageNamespace,
CreationDate: repo.CreationDate.Unix(),
Expand Down Expand Up @@ -2234,76 +2226,6 @@ func (c *Controller) RetentionUpdateRetentionPolicyHandler() retentionop.UpdateR
})
}

func (c *Controller) ImportFromS3InventoryHandler() repositories.ImportFromS3InventoryHandler {
return repositories.ImportFromS3InventoryHandlerFunc(func(params repositories.ImportFromS3InventoryParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
{
Action: permissions.CreateRepositoryAction,
Resource: permissions.RepoArn(params.Repository),
},
})
if err != nil {
return repositories.NewImportFromS3InventoryUnauthorized().WithPayload(responseErrorFrom(err))
}
deps.LogAction("import_from_s3_inventory")
userModel, err := c.deps.Auth.GetUser(user.ID)
username := "lakeFS"
if err == nil {
username = userModel.Username
}
importConfig := &onboard.Config{
CommitUsername: username,
InventoryURL: params.ManifestURL,
Repository: params.Repository,
InventoryGenerator: deps.BlockAdapter,
Cataloger: deps.Cataloger,
}
importer, err := onboard.CreateImporter(deps.ctx, deps.logger, importConfig)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
var importStats *onboard.Stats
dryRun := swag.BoolValue(params.DryRun)
if dryRun {
importStats, err = importer.Import(deps.ctx, true)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
} else {
repo, err := deps.Cataloger.GetRepository(c.Context(), params.Repository)
if err != nil {
return repositories.NewImportFromS3InventoryNotFound().
WithPayload(responseErrorFrom(err))
}
_, err = deps.Cataloger.GetBranchReference(deps.ctx, params.Repository, onboard.DefaultBranchName)
if errors.Is(err, db.ErrNotFound) {
_, err = deps.Cataloger.CreateBranch(deps.ctx, params.Repository, onboard.DefaultBranchName, repo.DefaultBranch)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
} else if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
importStats, err = importer.Import(params.HTTPRequest.Context(), false)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
}
return repositories.NewImportFromS3InventoryCreated().WithPayload(&repositories.ImportFromS3InventoryCreatedBody{
IsDryRun: dryRun,
PreviousImportDate: importStats.PreviousImportDate.Unix(),
PreviousManifest: importStats.PreviousInventoryURL,
AddedOrChanged: int64(importStats.AddedOrChanged),
Deleted: int64(importStats.Deleted),
})
})
}

func (c *Controller) ConfigGetConfigHandler() configop.GetConfigHandler {
return configop.GetConfigHandlerFunc(func(params configop.GetConfigParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
Expand Down
86 changes: 51 additions & 35 deletions api/api_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ func TestHandler_ListRepositoriesHandler(t *testing.T) {
t.Run("list some repos", func(t *testing.T) {
// write some repos
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "foo2", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "foo3", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "foo2", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "foo3", "s3://foo1", "master")
testutil.Must(t, err)

resp, err := clt.Repositories.ListRepositories(&repositories.ListRepositoriesParams{},
httptransport.BasicAuth(creds.AccessKeyID, creds.AccessSecretKey))
Expand Down Expand Up @@ -158,8 +161,9 @@ func TestHandler_GetRepoHandler(t *testing.T) {

t.Run("get existing repo", func(t *testing.T) {
const testBranchName = "non-default"
testutil.Must(t,
deps.cataloger.CreateRepository(context.Background(), "foo1", "s3://foo1", testBranchName))
_, err := deps.cataloger.CreateRepository(context.Background(), "foo1", "s3://foo1", testBranchName)
testutil.Must(t, err)

resp, err := clt.Repositories.GetRepository(&repositories.GetRepositoryParams{
Repository: "foo1",
}, httptransport.BasicAuth(creds.AccessKeyID, creds.AccessSecretKey))
Expand Down Expand Up @@ -188,7 +192,7 @@ func TestHandler_CommitsGetBranchCommitLogHandler(t *testing.T) {

ctx := context.Background()
t.Run("get missing branch", func(t *testing.T) {
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand All @@ -202,7 +206,7 @@ func TestHandler_CommitsGetBranchCommitLogHandler(t *testing.T) {
})

t.Run("get branch log", func(t *testing.T) {
err := deps.cataloger.CreateRepository(ctx, "repo2", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo2", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand All @@ -225,7 +229,7 @@ func TestHandler_CommitsGetBranchCommitLogHandler(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error getting log of commits: %s", err)
}
const expectedCommits = commitsLen + 1 // one for the branch creation
const expectedCommits = commitsLen + 2 // one for the branch creation + import branch
commitsLog := resp.GetPayload().Results
if len(commitsLog) != expectedCommits {
t.Fatalf("Log %d commits, expected %d", len(commitsLog), expectedCommits)
Expand Down Expand Up @@ -260,7 +264,7 @@ func TestHandler_GetCommitHandler(t *testing.T) {

t.Run("get existing commit", func(t *testing.T) {
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
testutil.Must(t, err)
testutil.MustDo(t, "create entry bar1", deps.cataloger.CreateEntry(ctx, "foo1", "master",
catalog.Entry{Path: "foo/bar1", PhysicalAddress: "bar1addr", CreationDate: time.Now(), Size: 1, Checksum: "cksum1"},
Expand Down Expand Up @@ -322,13 +326,13 @@ func TestHandler_CommitHandler(t *testing.T) {

t.Run("commit success", func(t *testing.T) {
ctx := context.Background()
testutil.MustDo(t, "create repo foo1",
deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
testutil.MustDo(t, "create repo foo1", err)
testutil.MustDo(t, "commit bar on foo1", deps.cataloger.CreateEntry(ctx, "foo1", "master",
catalog.Entry{Path: "foo/bar", PhysicalAddress: "pa", CreationDate: time.Now(), Size: 666, Checksum: "cs", Metadata: nil},
catalog.CreateEntryParams{},
))
_, err := clt.Commits.Commit(&commits.CommitParams{
_, err = clt.Commits.Commit(&commits.CommitParams{
Branch: "master",
Commit: &models.CommitCreation{
Message: swag.String("some message"),
Expand Down Expand Up @@ -373,7 +377,7 @@ func TestHandler_CreateRepositoryHandler(t *testing.T) {

t.Run("create repo duplicate", func(t *testing.T) {
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo1/", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo1/", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -404,9 +408,10 @@ func TestHandler_DeleteRepositoryHandler(t *testing.T) {

ctx := context.Background()
t.Run("delete repo success", func(t *testing.T) {
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1/", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1/", "master")
testutil.Must(t, err)

_, err := clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
_, err = clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
Repository: "my-new-repo",
}, bauth)

Expand All @@ -431,11 +436,15 @@ func TestHandler_DeleteRepositoryHandler(t *testing.T) {
})

t.Run("delete repo doesnt delete other repos", func(t *testing.T) {
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr0", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr1", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr11", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr2", "s3://foo1", "master"))
_, err := clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
_, err := deps.cataloger.CreateRepository(ctx, "rr0", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "rr1", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "rr11", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "rr2", "s3://foo1", "master")
testutil.Must(t, err)
_, err = clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
Repository: "rr1",
}, bauth)

Expand Down Expand Up @@ -473,22 +482,26 @@ func TestHandler_ListBranchesHandler(t *testing.T) {

t.Run("list branches only default", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master")
testutil.Must(t, err)
resp, err := clt.Branches.ListBranches(&branches.ListBranchesParams{
Amount: swag.Int64(-1),
Repository: "repo1",
}, bauth)
if err != nil {
t.Fatalf("unexpected error listing branches: %s", err)
}
if len(resp.GetPayload().Results) != 1 {
t.Fatalf("expected 1 branch, got %d", len(resp.GetPayload().Results))
const expectedBranchesLen = 2 // branch creation and import branch
branchesLen := len(resp.GetPayload().Results)
if branchesLen != expectedBranchesLen {
t.Fatalf("ListBranches len=%d, expected %d", branchesLen, expectedBranchesLen)
}
})

t.Run("list branches pagination", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo2", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo2", "master")
testutil.Must(t, err)
for i := 0; i < 7; i++ {
branchName := "master" + strconv.Itoa(i+1)
_, err := deps.cataloger.CreateBranch(ctx, "repo2", branchName, "master")
Expand Down Expand Up @@ -550,7 +563,8 @@ func TestHandler_GetBranchHandler(t *testing.T) {
t.Run("get default branch", func(t *testing.T) {
ctx := context.Background()
const testBranch = "master"
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", testBranch))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", testBranch)
testutil.Must(t, err)
resp, err := clt.Branches.GetBranch(&branches.GetBranchParams{
Branch: testBranch,
Repository: "repo1",
Expand Down Expand Up @@ -598,7 +612,8 @@ func TestHandler_CreateBranchHandler(t *testing.T) {

t.Run("create branch success", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master")
testutil.Must(t, err)
const newBranchName = "master2"
resp, err := clt.Branches.CreateBranch(&branches.CreateBranchParams{
Branch: &models.BranchCreation{
Expand Down Expand Up @@ -656,8 +671,9 @@ func TestHandler_DeleteBranchHandler(t *testing.T) {

t.Run("delete branch success", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1", "master"))
_, err := deps.cataloger.CreateBranch(ctx, "my-new-repo", "master2", "master")
_, err := deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateBranch(ctx, "my-new-repo", "master2", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -701,7 +717,7 @@ func TestHandler_ObjectsStatObjectHandler(t *testing.T) {
clt.SetTransport(&handlerTransport{Handler: handler})

ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -790,8 +806,8 @@ func TestHandler_ObjectsListObjectsHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
testutil.Must(t,
deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
testutil.Must(t, err)
testutil.Must(t,
deps.cataloger.CreateEntry(ctx, "repo1", "master", catalog.Entry{
Path: "foo/bar",
Expand Down Expand Up @@ -888,7 +904,7 @@ func TestHandler_ObjectsGetObjectHandler(t *testing.T) {
// setup client
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -995,7 +1011,7 @@ func TestHandler_ObjectsUploadObjectHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1095,7 +1111,7 @@ func TestHandler_ObjectsDeleteObjectHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1265,7 +1281,7 @@ func TestHandler_RetentionPolicyHandlers(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 5 additions & 4 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
)

const (
CatalogerCommitter = ""

DefaultPathDelimiter = "/"
CatalogerCommitter = ""
DefaultBranchName = "master"
DefaultImportBranchName = "import-from-inventory"
DefaultPathDelimiter = "/"

dedupBatchSize = 10
dedupBatchTimeout = 50 * time.Millisecond
Expand Down Expand Up @@ -58,7 +59,7 @@ type ExpireResult struct {
}

type RepositoryCataloger interface {
CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) error
CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) (*Repository, error)
GetRepository(ctx context.Context, repository string) (*Repository, error)
DeleteRepository(ctx context.Context, repository string) error
ListRepositories(ctx context.Context, limit int, after string) ([]*Repository, bool, error)
Expand Down
4 changes: 2 additions & 2 deletions catalog/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func TestCataloger_Commit(t *testing.T) {
name: "simple",
args: args{repository: repository, branch: "master", message: "Simple commit", committer: "tester", metadata: meta},
want: &CommitLog{
Reference: "~KJ8Wd1Rs96Z",
Reference: "~KJ8Wd1Rs96a",
Committer: "tester",
Message: "Simple commit",
CreationDate: time.Now(),
Metadata: meta,
Parents: []string{"~KJ8Wd1Rs96Y"},
Parents: []string{"~KJ8Wd1Rs96Z"},
},
wantErr: false,
},
Expand Down

0 comments on commit 4a51df8

Please sign in to comment.