Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
authparams "github.com/treeverse/lakefs/auth/params"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/catalog/mvcc"
catalogfactory "github.com/treeverse/lakefs/catalog/factory"
"github.com/treeverse/lakefs/db"
dbparams "github.com/treeverse/lakefs/db/params"
"github.com/treeverse/lakefs/dedup"
Expand Down Expand Up @@ -83,7 +83,8 @@ func getHandler(t *testing.T, blockstoreType string, opts ...testutil.GetDBOptio
blockstoreType, _ = os.LookupEnv(testutil.EnvKeyUseBlockAdapter)
}
blockAdapter = testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, blockstoreType)
cataloger := mvcc.NewCataloger(conn, mvcc.WithCacheEnabled(false))
cataloger := catalogfactory.BuildCataloger(conn, nil)

authService := auth.NewDBAuthService(conn, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{
Enabled: false,
})
Expand Down
20 changes: 20 additions & 0 deletions catalog/factory/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package factory

import (
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/catalog/mvcc"
"github.com/treeverse/lakefs/catalog/rocks"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
)

func BuildCataloger(db db.Database, c *config.Config) catalog.Cataloger {
if c == nil {
return mvcc.NewCataloger(db, mvcc.WithCacheEnabled(false))
}
catType := c.GetCatalogerType()
if catType == "rocks" {
return rocks.NewCataloger()
}
return mvcc.NewCataloger(db, mvcc.WithParams(c.GetMvccCatalogerCatalogParams()))
}
192 changes: 192 additions & 0 deletions catalog/rocks/cataloger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package rocks

import (
"context"
"time"

"github.com/treeverse/lakefs/catalog"
)

type cataloger struct{}

func NewCataloger() catalog.Cataloger {
return &cataloger{}
}

// CreateRepository create a new repository pointing to 'storageNamespace' (ex: s3://bucket1/repo) with default branch name 'branch'
func (c *cataloger) CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) (*catalog.Repository, error) {
panic("not implemented") // TODO: Implement
}

// GetRepository get repository information
func (c *cataloger) GetRepository(ctx context.Context, repository string) (*catalog.Repository, error) {
panic("not implemented") // TODO: Implement
}

// DeleteRepository delete a repository
func (c *cataloger) DeleteRepository(ctx context.Context, repository string) error {
panic("not implemented") // TODO: Implement
}

// ListRepositories list repositories information, the bool returned is true when more repositories can be listed.
// In this case pass the last repository name as 'after' on the next call to ListRepositories
func (c *cataloger) ListRepositories(ctx context.Context, limit int, after string) ([]*catalog.Repository, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) CreateBranch(ctx context.Context, repository string, branch string, sourceBranch string) (*catalog.CommitLog, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) DeleteBranch(ctx context.Context, repository string, branch string) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ListBranches(ctx context.Context, repository string, prefix string, limit int, after string) ([]*catalog.Branch, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) BranchExists(ctx context.Context, repository string, branch string) (bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) GetBranchReference(ctx context.Context, repository string, branch string) (string, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ResetBranch(ctx context.Context, repository string, branch string) error {
panic("not implemented") // TODO: Implement
}

// GetEntry returns the current entry for path in repository branch reference. Returns
// the entry with ExpiredError if it has expired from underlying storage.
func (c *cataloger) GetEntry(ctx context.Context, repository string, reference string, path string, params catalog.GetEntryParams) (*catalog.Entry, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) CreateEntry(ctx context.Context, repository string, branch string, entry catalog.Entry, params catalog.CreateEntryParams) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) CreateEntries(ctx context.Context, repository string, branch string, entries []catalog.Entry) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) DeleteEntry(ctx context.Context, repository string, branch string, path string) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ListEntries(ctx context.Context, repository string, reference string, prefix string, after string, delimiter string, limit int) ([]*catalog.Entry, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ResetEntry(ctx context.Context, repository string, branch string, path string) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ResetEntries(ctx context.Context, repository string, branch string, prefix string) error {
panic("not implemented") // TODO: Implement
}

// QueryEntriesToExpire returns ExpiryRows iterating over all objects to expire on
// repositoryName according to policy.
func (c *cataloger) QueryEntriesToExpire(ctx context.Context, repositoryName string, policy *catalog.Policy) (catalog.ExpiryRows, error) {
panic("not implemented") // TODO: Implement
}

// MarkEntriesExpired marks all entries identified by expire as expired. It is a batch operation.
func (c *cataloger) MarkEntriesExpired(ctx context.Context, repositoryName string, expireResults []*catalog.ExpireResult) error {
panic("not implemented") // TODO: Implement
}

// MarkObjectsForDeletion marks objects in catalog_object_dedup as "deleting" if all
// their entries are expired, and returns the new total number of objects marked (or an
// error). These objects are not yet safe to delete: there could be a race between
// marking objects as expired deduping newly-uploaded objects. See
// DeleteOrUnmarkObjectsForDeletion for that actual deletion.
func (c *cataloger) MarkObjectsForDeletion(ctx context.Context, repositoryName string) (int64, error) {
panic("not implemented") // TODO: Implement
}

// DeleteOrUnmarkObjectsForDeletion scans objects in catalog_object_dedup for objects
// marked "deleting" and returns an iterator over physical addresses of those objects
// all of whose referring entries are still expired. If called after MarkEntriesExpired
// and MarkObjectsForDeletion this is safe, because no further entries can refer to
// expired objects. It also removes the "deleting" mark from those objects that have an
// entry _not_ marked as expiring and therefore were not on the returned rows.
func (c *cataloger) DeleteOrUnmarkObjectsForDeletion(ctx context.Context, repositoryName string) (catalog.StringIterator, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) DedupReportChannel() chan *catalog.DedupReport {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) CreateMultipartUpload(ctx context.Context, repository string, uploadID string, path string, physicalAddress string, creationTime time.Time) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) GetMultipartUpload(ctx context.Context, repository string, uploadID string) (*catalog.MultipartUpload, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) DeleteMultipartUpload(ctx context.Context, repository string, uploadID string) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) Commit(ctx context.Context, repository string, branch string, message string, committer string, metadata catalog.Metadata) (*catalog.CommitLog, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) GetCommit(ctx context.Context, repository string, reference string) (*catalog.CommitLog, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ListCommits(ctx context.Context, repository string, branch string, fromReference string, limit int) ([]*catalog.CommitLog, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) RollbackCommit(ctx context.Context, repository string, reference string) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) Diff(ctx context.Context, repository string, leftReference string, rightReference string, params catalog.DiffParams) (catalog.Differences, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) DiffUncommitted(ctx context.Context, repository string, branch string, limit int, after string) (catalog.Differences, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) Merge(ctx context.Context, repository string, leftBranch string, rightBranch string, committer string, message string, metadata catalog.Metadata) (*catalog.MergeResult, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) Hooks() *catalog.CatalogerHooks {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (catalog.ExportConfiguration, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) GetExportConfigurations() ([]catalog.ExportConfigurationForBranch, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) PutExportConfiguration(repository string, branch string, conf *catalog.ExportConfiguration) error {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) ExportStateSet(repo string, branch string, cb catalog.ExportStateCallback) error {
panic("not implemented") // TODO: Implement
}

// GetExportState returns the current Export state params
func (c *cataloger) GetExportState(repo string, branch string) (catalog.ExportState, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) Close() error {
panic("not implemented") // TODO: Implement
}
5 changes: 2 additions & 3 deletions cmd/lakefs-loadtest/cmd/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (
"sync/atomic"
"time"

"github.com/treeverse/lakefs/catalog/mvcc"

"github.com/google/uuid"
"github.com/jamiealquiza/tachymeter"
nanoid "github.com/matoous/go-nanoid"
"github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/catalog"
catalogfactory "github.com/treeverse/lakefs/catalog/factory"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/uri"
Expand Down Expand Up @@ -56,7 +55,7 @@ var entryCmd = &cobra.Command{
defer database.Close()

conf := config.NewConfig()
c := mvcc.NewCataloger(database, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams()))
c := catalogfactory.BuildCataloger(database, conf)

// validate repository and branch
_, err := c.GetRepository(ctx, u.Repository)
Expand Down
6 changes: 2 additions & 4 deletions cmd/lakefs/cmd/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/block/factory"
"github.com/treeverse/lakefs/catalog/mvcc"
"github.com/treeverse/lakefs/config"
catalogfactory "github.com/treeverse/lakefs/catalog/factory"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)
Expand All @@ -18,14 +17,13 @@ var diagnoseCmd = &cobra.Command{
Short: "Diagnose underlying infrastructure configuration",
Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background()
conf := config.NewConfig()
logger := logging.Default().WithContext(ctx)
dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
adapter, err := factory.BuildBlockAdapter(cfg)
if err != nil {
logger.WithError(err).Fatal("Failed to create block adapter")
}
cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams()))
cataloger := catalogfactory.BuildCataloger(dbPool, cfg)

numFailures := 0
repos, _, err := cataloger.ListRepositories(ctx, -1, "")
Expand Down
7 changes: 2 additions & 5 deletions cmd/lakefs/cmd/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"fmt"
"net/url"

"github.com/treeverse/lakefs/catalog/mvcc"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3control"
"github.com/spf13/cobra"

catalogfactory "github.com/treeverse/lakefs/catalog/factory"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
Expand All @@ -24,10 +22,9 @@ var expireCmd = &cobra.Command{
Short: "Apply configured retention policies to expire objects",
Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background()
conf := config.NewConfig()
logger := logging.FromContext(ctx)
dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams()))
cataloger := catalogfactory.BuildCataloger(dbPool, cfg)

awsRetentionConfig := config.NewConfig().GetAwsS3RetentionConfig()

Expand Down
5 changes: 2 additions & 3 deletions cmd/lakefs/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"os"
"strings"

"github.com/treeverse/lakefs/catalog/mvcc"

"github.com/jedib0t/go-pretty/text"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/block/factory"
"github.com/treeverse/lakefs/catalog"
catalogfactory "github.com/treeverse/lakefs/catalog/factory"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
Expand Down Expand Up @@ -60,7 +59,7 @@ var importCmd = &cobra.Command{
dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
defer dbPool.Close()

cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams()))
cataloger := catalogfactory.BuildCataloger(dbPool, cfg)
defer func() { _ = cataloger.Close() }()

u := uri.Must(uri.Parse(args[0]))
Expand Down
3 changes: 1 addition & 2 deletions cmd/lakefs/cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"fmt"
"os"

"github.com/treeverse/lakefs/db"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/db"
)

// migrateCmd represents the migrate command
Expand Down
7 changes: 3 additions & 4 deletions cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/treeverse/lakefs/auth"
"github.com/treeverse/lakefs/auth/crypt"
"github.com/treeverse/lakefs/block/factory"
"github.com/treeverse/lakefs/catalog/mvcc"
catalogfactory "github.com/treeverse/lakefs/catalog/factory"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/dedup"
Expand Down Expand Up @@ -49,7 +49,6 @@ var runCmd = &cobra.Command{
Use: "run",
Short: "Run lakeFS",
Run: func(cmd *cobra.Command, args []string) {
conf := config.NewConfig()
logger := logging.Default()
logger.WithField("version", config.Version).Infof("lakeFS run")

Expand All @@ -65,12 +64,12 @@ var runCmd = &cobra.Command{
}
dbPool := db.BuildDatabaseConnection(dbParams)
defer dbPool.Close()

registerPrometheusCollector(dbPool)
retention := retention.NewService(dbPool)
migrator := db.NewDatabaseMigrator(dbParams)

// init catalog
cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams()))
cataloger := catalogfactory.BuildCataloger(dbPool, cfg)

// init block store
blockStore, err := factory.BuildBlockAdapter(cfg)
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (c *Config) GetDatabaseParams() dbparams.Database {
}
}

func (c *Config) GetCatalogerType() string {
return viper.GetString("cataloger.type")
}

func (c *Config) GetMvccCatalogerCatalogParams() catalogparams.Catalog {
return catalogparams.Catalog{
BatchRead: catalogparams.BatchRead{
Expand Down
Loading