diff --git a/api/handler_test.go b/api/handler_test.go index 15677006cea..0be8a117bde 100644 --- a/api/handler_test.go +++ b/api/handler_test.go @@ -21,7 +21,7 @@ import ( authparams "github.com/treeverse/lakefs/auth/params" "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/catalog/mvcc" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/db" dbparams "github.com/treeverse/lakefs/db/params" "github.com/treeverse/lakefs/dedup" @@ -83,7 +83,8 @@ func getHandler(t *testing.T, blockstoreType string, opts ...testutil.GetDBOptio blockstoreType, _ = os.LookupEnv(testutil.EnvKeyUseBlockAdapter) } blockAdapter = testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, blockstoreType) - cataloger := mvcc.NewCataloger(conn, mvcc.WithCacheEnabled(false)) + cataloger := catalogfactory.BuildCataloger(conn, nil) + authService := auth.NewDBAuthService(conn, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{ Enabled: false, }) diff --git a/catalog/factory/build.go b/catalog/factory/build.go new file mode 100644 index 00000000000..bbd0f678214 --- /dev/null +++ b/catalog/factory/build.go @@ -0,0 +1,20 @@ +package factory + +import ( + "github.com/treeverse/lakefs/catalog" + "github.com/treeverse/lakefs/catalog/mvcc" + "github.com/treeverse/lakefs/catalog/rocks" + "github.com/treeverse/lakefs/config" + "github.com/treeverse/lakefs/db" +) + +func BuildCataloger(db db.Database, c *config.Config) catalog.Cataloger { + if c == nil { + return mvcc.NewCataloger(db, mvcc.WithCacheEnabled(false)) + } + catType := c.GetCatalogerType() + if catType == "rocks" { + return rocks.NewCataloger() + } + return mvcc.NewCataloger(db, mvcc.WithParams(c.GetMvccCatalogerCatalogParams())) +} diff --git a/catalog/rocks/cataloger.go b/catalog/rocks/cataloger.go new file mode 100644 index 00000000000..655f36741b9 --- /dev/null +++ b/catalog/rocks/cataloger.go @@ -0,0 +1,192 @@ +package rocks + +import ( + "context" + "time" + + "github.com/treeverse/lakefs/catalog" +) + +type cataloger struct{} + +func NewCataloger() catalog.Cataloger { + return &cataloger{} +} + +// CreateRepository create a new repository pointing to 'storageNamespace' (ex: s3://bucket1/repo) with default branch name 'branch' +func (c *cataloger) CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) (*catalog.Repository, error) { + panic("not implemented") // TODO: Implement +} + +// GetRepository get repository information +func (c *cataloger) GetRepository(ctx context.Context, repository string) (*catalog.Repository, error) { + panic("not implemented") // TODO: Implement +} + +// DeleteRepository delete a repository +func (c *cataloger) DeleteRepository(ctx context.Context, repository string) error { + panic("not implemented") // TODO: Implement +} + +// ListRepositories list repositories information, the bool returned is true when more repositories can be listed. +// In this case pass the last repository name as 'after' on the next call to ListRepositories +func (c *cataloger) ListRepositories(ctx context.Context, limit int, after string) ([]*catalog.Repository, bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) CreateBranch(ctx context.Context, repository string, branch string, sourceBranch string) (*catalog.CommitLog, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) DeleteBranch(ctx context.Context, repository string, branch string) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ListBranches(ctx context.Context, repository string, prefix string, limit int, after string) ([]*catalog.Branch, bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) BranchExists(ctx context.Context, repository string, branch string) (bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) GetBranchReference(ctx context.Context, repository string, branch string) (string, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ResetBranch(ctx context.Context, repository string, branch string) error { + panic("not implemented") // TODO: Implement +} + +// GetEntry returns the current entry for path in repository branch reference. Returns +// the entry with ExpiredError if it has expired from underlying storage. +func (c *cataloger) GetEntry(ctx context.Context, repository string, reference string, path string, params catalog.GetEntryParams) (*catalog.Entry, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) CreateEntry(ctx context.Context, repository string, branch string, entry catalog.Entry, params catalog.CreateEntryParams) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) CreateEntries(ctx context.Context, repository string, branch string, entries []catalog.Entry) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) DeleteEntry(ctx context.Context, repository string, branch string, path string) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ListEntries(ctx context.Context, repository string, reference string, prefix string, after string, delimiter string, limit int) ([]*catalog.Entry, bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ResetEntry(ctx context.Context, repository string, branch string, path string) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ResetEntries(ctx context.Context, repository string, branch string, prefix string) error { + panic("not implemented") // TODO: Implement +} + +// QueryEntriesToExpire returns ExpiryRows iterating over all objects to expire on +// repositoryName according to policy. +func (c *cataloger) QueryEntriesToExpire(ctx context.Context, repositoryName string, policy *catalog.Policy) (catalog.ExpiryRows, error) { + panic("not implemented") // TODO: Implement +} + +// MarkEntriesExpired marks all entries identified by expire as expired. It is a batch operation. +func (c *cataloger) MarkEntriesExpired(ctx context.Context, repositoryName string, expireResults []*catalog.ExpireResult) error { + panic("not implemented") // TODO: Implement +} + +// MarkObjectsForDeletion marks objects in catalog_object_dedup as "deleting" if all +// their entries are expired, and returns the new total number of objects marked (or an +// error). These objects are not yet safe to delete: there could be a race between +// marking objects as expired deduping newly-uploaded objects. See +// DeleteOrUnmarkObjectsForDeletion for that actual deletion. +func (c *cataloger) MarkObjectsForDeletion(ctx context.Context, repositoryName string) (int64, error) { + panic("not implemented") // TODO: Implement +} + +// DeleteOrUnmarkObjectsForDeletion scans objects in catalog_object_dedup for objects +// marked "deleting" and returns an iterator over physical addresses of those objects +// all of whose referring entries are still expired. If called after MarkEntriesExpired +// and MarkObjectsForDeletion this is safe, because no further entries can refer to +// expired objects. It also removes the "deleting" mark from those objects that have an +// entry _not_ marked as expiring and therefore were not on the returned rows. +func (c *cataloger) DeleteOrUnmarkObjectsForDeletion(ctx context.Context, repositoryName string) (catalog.StringIterator, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) DedupReportChannel() chan *catalog.DedupReport { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) CreateMultipartUpload(ctx context.Context, repository string, uploadID string, path string, physicalAddress string, creationTime time.Time) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) GetMultipartUpload(ctx context.Context, repository string, uploadID string) (*catalog.MultipartUpload, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) DeleteMultipartUpload(ctx context.Context, repository string, uploadID string) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) Commit(ctx context.Context, repository string, branch string, message string, committer string, metadata catalog.Metadata) (*catalog.CommitLog, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) GetCommit(ctx context.Context, repository string, reference string) (*catalog.CommitLog, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ListCommits(ctx context.Context, repository string, branch string, fromReference string, limit int) ([]*catalog.CommitLog, bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) RollbackCommit(ctx context.Context, repository string, reference string) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) Diff(ctx context.Context, repository string, leftReference string, rightReference string, params catalog.DiffParams) (catalog.Differences, bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) DiffUncommitted(ctx context.Context, repository string, branch string, limit int, after string) (catalog.Differences, bool, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) Merge(ctx context.Context, repository string, leftBranch string, rightBranch string, committer string, message string, metadata catalog.Metadata) (*catalog.MergeResult, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) Hooks() *catalog.CatalogerHooks { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (catalog.ExportConfiguration, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) GetExportConfigurations() ([]catalog.ExportConfigurationForBranch, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) PutExportConfiguration(repository string, branch string, conf *catalog.ExportConfiguration) error { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) ExportStateSet(repo string, branch string, cb catalog.ExportStateCallback) error { + panic("not implemented") // TODO: Implement +} + +// GetExportState returns the current Export state params +func (c *cataloger) GetExportState(repo string, branch string) (catalog.ExportState, error) { + panic("not implemented") // TODO: Implement +} + +func (c *cataloger) Close() error { + panic("not implemented") // TODO: Implement +} diff --git a/cmd/lakefs-loadtest/cmd/entry.go b/cmd/lakefs-loadtest/cmd/entry.go index b8c6fe4d5cf..e229e3f8f6d 100644 --- a/cmd/lakefs-loadtest/cmd/entry.go +++ b/cmd/lakefs-loadtest/cmd/entry.go @@ -10,14 +10,13 @@ import ( "sync/atomic" "time" - "github.com/treeverse/lakefs/catalog/mvcc" - "github.com/google/uuid" "github.com/jamiealquiza/tachymeter" nanoid "github.com/matoous/go-nanoid" "github.com/schollz/progressbar/v3" "github.com/spf13/cobra" "github.com/treeverse/lakefs/catalog" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/cmdutils" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/uri" @@ -56,7 +55,7 @@ var entryCmd = &cobra.Command{ defer database.Close() conf := config.NewConfig() - c := mvcc.NewCataloger(database, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams())) + c := catalogfactory.BuildCataloger(database, conf) // validate repository and branch _, err := c.GetRepository(ctx, u.Repository) diff --git a/cmd/lakefs/cmd/diagnose.go b/cmd/lakefs/cmd/diagnose.go index c9937661163..d20fbfaee2a 100644 --- a/cmd/lakefs/cmd/diagnose.go +++ b/cmd/lakefs/cmd/diagnose.go @@ -6,8 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/treeverse/lakefs/block/factory" - "github.com/treeverse/lakefs/catalog/mvcc" - "github.com/treeverse/lakefs/config" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" ) @@ -18,14 +17,13 @@ var diagnoseCmd = &cobra.Command{ Short: "Diagnose underlying infrastructure configuration", Run: func(cmd *cobra.Command, args []string) { ctx := context.Background() - conf := config.NewConfig() logger := logging.Default().WithContext(ctx) dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams()) adapter, err := factory.BuildBlockAdapter(cfg) if err != nil { logger.WithError(err).Fatal("Failed to create block adapter") } - cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams())) + cataloger := catalogfactory.BuildCataloger(dbPool, cfg) numFailures := 0 repos, _, err := cataloger.ListRepositories(ctx, -1, "") diff --git a/cmd/lakefs/cmd/expire.go b/cmd/lakefs/cmd/expire.go index 85055bb55da..a3e0645e7d8 100644 --- a/cmd/lakefs/cmd/expire.go +++ b/cmd/lakefs/cmd/expire.go @@ -5,13 +5,11 @@ import ( "fmt" "net/url" - "github.com/treeverse/lakefs/catalog/mvcc" - "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3control" "github.com/spf13/cobra" - + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" @@ -24,10 +22,9 @@ var expireCmd = &cobra.Command{ Short: "Apply configured retention policies to expire objects", Run: func(cmd *cobra.Command, args []string) { ctx := context.Background() - conf := config.NewConfig() logger := logging.FromContext(ctx) dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams()) - cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams())) + cataloger := catalogfactory.BuildCataloger(dbPool, cfg) awsRetentionConfig := config.NewConfig().GetAwsS3RetentionConfig() diff --git a/cmd/lakefs/cmd/import.go b/cmd/lakefs/cmd/import.go index 5db0475a990..c8b3a959b5f 100644 --- a/cmd/lakefs/cmd/import.go +++ b/cmd/lakefs/cmd/import.go @@ -8,12 +8,11 @@ import ( "os" "strings" - "github.com/treeverse/lakefs/catalog/mvcc" - "github.com/jedib0t/go-pretty/text" "github.com/spf13/cobra" "github.com/treeverse/lakefs/block/factory" "github.com/treeverse/lakefs/catalog" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/cmdutils" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" @@ -60,7 +59,7 @@ var importCmd = &cobra.Command{ dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams()) defer dbPool.Close() - cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams())) + cataloger := catalogfactory.BuildCataloger(dbPool, cfg) defer func() { _ = cataloger.Close() }() u := uri.Must(uri.Parse(args[0])) diff --git a/cmd/lakefs/cmd/migrate.go b/cmd/lakefs/cmd/migrate.go index 6d04b25cf71..c70fb922c19 100644 --- a/cmd/lakefs/cmd/migrate.go +++ b/cmd/lakefs/cmd/migrate.go @@ -4,9 +4,8 @@ import ( "fmt" "os" - "github.com/treeverse/lakefs/db" - "github.com/spf13/cobra" + "github.com/treeverse/lakefs/db" ) // migrateCmd represents the migrate command diff --git a/cmd/lakefs/cmd/run.go b/cmd/lakefs/cmd/run.go index 3a36a7b9e66..917d6fad550 100644 --- a/cmd/lakefs/cmd/run.go +++ b/cmd/lakefs/cmd/run.go @@ -19,7 +19,7 @@ import ( "github.com/treeverse/lakefs/auth" "github.com/treeverse/lakefs/auth/crypt" "github.com/treeverse/lakefs/block/factory" - "github.com/treeverse/lakefs/catalog/mvcc" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/dedup" @@ -49,7 +49,6 @@ var runCmd = &cobra.Command{ Use: "run", Short: "Run lakeFS", Run: func(cmd *cobra.Command, args []string) { - conf := config.NewConfig() logger := logging.Default() logger.WithField("version", config.Version).Infof("lakeFS run") @@ -65,12 +64,12 @@ var runCmd = &cobra.Command{ } dbPool := db.BuildDatabaseConnection(dbParams) defer dbPool.Close() + registerPrometheusCollector(dbPool) retention := retention.NewService(dbPool) migrator := db.NewDatabaseMigrator(dbParams) - // init catalog - cataloger := mvcc.NewCataloger(dbPool, mvcc.WithParams(conf.GetMvccCatalogerCatalogParams())) + cataloger := catalogfactory.BuildCataloger(dbPool, cfg) // init block store blockStore, err := factory.BuildBlockAdapter(cfg) diff --git a/config/config.go b/config/config.go index 83d7e936f28..4d8d3640a1b 100644 --- a/config/config.go +++ b/config/config.go @@ -107,6 +107,10 @@ func (c *Config) GetDatabaseParams() dbparams.Database { } } +func (c *Config) GetCatalogerType() string { + return viper.GetString("cataloger.type") +} + func (c *Config) GetMvccCatalogerCatalogParams() catalogparams.Catalog { return catalogparams.Catalog{ BatchRead: catalogparams.BatchRead{ diff --git a/gateway/playback_test.go b/gateway/playback_test.go index 432d292a992..fc48970622f 100644 --- a/gateway/playback_test.go +++ b/gateway/playback_test.go @@ -14,11 +14,10 @@ import ( "strings" "testing" - "github.com/treeverse/lakefs/catalog/mvcc" - "github.com/ory/dockertest/v3" "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/dedup" "github.com/treeverse/lakefs/gateway" "github.com/treeverse/lakefs/gateway/simulator" @@ -112,7 +111,7 @@ func getBasicHandler(t *testing.T, authService *simulator.PlayBackMockConf) (htt } conn, _ := testutil.GetDB(t, databaseURI) - cataloger := mvcc.NewCataloger(conn) + cataloger := catalogfactory.BuildCataloger(conn, nil) blockstoreType, _ := os.LookupEnv(testutil.EnvKeyUseBlockAdapter) blockAdapter := testutil.NewBlockAdapterByType(t, IdTranslator, blockstoreType) diff --git a/loadtest/local_load_test.go b/loadtest/local_load_test.go index f8f71288302..cc573851b28 100644 --- a/loadtest/local_load_test.go +++ b/loadtest/local_load_test.go @@ -15,7 +15,7 @@ import ( authmodel "github.com/treeverse/lakefs/auth/model" authparams "github.com/treeverse/lakefs/auth/params" "github.com/treeverse/lakefs/block" - "github.com/treeverse/lakefs/catalog/mvcc" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/db" dbparams "github.com/treeverse/lakefs/db/params" "github.com/treeverse/lakefs/dedup" @@ -56,7 +56,7 @@ func TestLocalLoad(t *testing.T) { conn, _ := testutil.GetDB(t, databaseURI) blockstoreType, _ := os.LookupEnv(testutil.EnvKeyUseBlockAdapter) blockAdapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, blockstoreType) - cataloger := mvcc.NewCataloger(conn) + cataloger := catalogfactory.BuildCataloger(conn, nil) authService := auth.NewDBAuthService(conn, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{}) retentionService := retention.NewService(conn) meta := auth.NewDBMetadataManager("dev", conn) diff --git a/retention/service_test.go b/retention/service_test.go index 4b00da6c856..178becf2e60 100644 --- a/retention/service_test.go +++ b/retention/service_test.go @@ -9,7 +9,7 @@ import ( "github.com/go-test/deep" "github.com/ory/dockertest/v3" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/catalog/mvcc" + catalogfactory "github.com/treeverse/lakefs/catalog/factory" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/retention" "github.com/treeverse/lakefs/testutil" @@ -37,7 +37,7 @@ func setupService(t *testing.T, opts ...testutil.GetDBOption) *retention.DBReten t.Helper() ctx := context.Background() cdb, _ := testutil.GetDB(t, databaseURI, opts...) - cataloger := mvcc.NewCataloger(cdb) + cataloger := catalogfactory.BuildCataloger(cdb, nil) _, err := cataloger.CreateRepository(ctx, "repo", "s3://repo", "master") testutil.MustDo(t, "create repository", err) return retention.NewDBRetentionService(cdb)