diff --git a/pkg/gateway/testutil/gateway_setup.go b/pkg/gateway/testutil/gateway_setup.go index 7d247d44dbf..595817a7ede 100644 --- a/pkg/gateway/testutil/gateway_setup.go +++ b/pkg/gateway/testutil/gateway_setup.go @@ -15,8 +15,8 @@ import ( "github.com/treeverse/lakefs/pkg/config" "github.com/treeverse/lakefs/pkg/gateway" "github.com/treeverse/lakefs/pkg/gateway/multipart" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" - "github.com/treeverse/lakefs/pkg/kv/kvtest" _ "github.com/treeverse/lakefs/pkg/kv/mem" "github.com/treeverse/lakefs/pkg/logging" "github.com/treeverse/lakefs/pkg/stats" @@ -34,7 +34,8 @@ func GetBasicHandler(t *testing.T, authService *FakeAuthService, repoName string ctx := context.Background() viper.Set(config.BlockstoreTypeKey, block.BlockstoreTypeMem) - store := kvtest.MakeStoreByName("mem", kvparams.Config{})(t, ctx) + store, err := kv.Open(ctx, kvparams.Config{Type: "mem"}) + testutil.MustDo(t, "open kv store", err) defer store.Close() multipartTracker := multipart.NewTracker(store) diff --git a/pkg/kv/cosmosdb/main_test.go b/pkg/kv/cosmosdb/main_test.go index 43f6baa6a9b..6a9abebd463 100644 --- a/pkg/kv/cosmosdb/main_test.go +++ b/pkg/kv/cosmosdb/main_test.go @@ -1,14 +1,32 @@ package cosmosdb_test import ( + "context" "os" "testing" + "github.com/treeverse/lakefs/pkg/kv" + "github.com/treeverse/lakefs/pkg/kv/cosmosdb" "github.com/treeverse/lakefs/pkg/kv/kvparams" + "github.com/treeverse/lakefs/pkg/kv/kvtest" ) var testParams *kvparams.CosmosDB +func TestCosmosDB(t *testing.T) { + t.Skip("CosmosDB tests are flaky due to the emulator. If you plan on running those, make sure to assign at least 3CPUs and" + + " 4GB of memory to the container running the emulator.") + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{CosmosDB: testParams, Type: cosmosdb.DriverName}) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", cosmosdb.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) +} + func TestMain(m *testing.M) { code := m.Run() os.Exit(code) diff --git a/pkg/kv/cosmosdb/store_test.go b/pkg/kv/cosmosdb/store_test.go deleted file mode 100644 index 7d3839314f3..00000000000 --- a/pkg/kv/cosmosdb/store_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package cosmosdb_test - -import ( - "testing" - - "github.com/treeverse/lakefs/pkg/kv/cosmosdb" - "github.com/treeverse/lakefs/pkg/kv/kvparams" - "github.com/treeverse/lakefs/pkg/kv/kvtest" -) - -func TestCosmosDB(t *testing.T) { - t.Skip("CosmosDB tests are flaky due to the emulator. If you plan on running those, make sure to assign at least 3CPUs and" + - " 4GB of memory to the container running the emulator.") - kvtest.DriverTest(t, cosmosdb.DriverName, kvparams.Config{CosmosDB: testParams}) -} diff --git a/pkg/kv/dynamodb/main_test.go b/pkg/kv/dynamodb/main_test.go index 7ba3b5b18d5..fad7372b6ea 100644 --- a/pkg/kv/dynamodb/main_test.go +++ b/pkg/kv/dynamodb/main_test.go @@ -10,22 +10,15 @@ import ( ) var testParams *kvparams.DynamoDB +var databaseURI string func TestMain(m *testing.M) { - databaseURI, cleanupFunc, err := testutil.GetDynamoDBInstance() + var err error + var cleanupFunc func() + databaseURI, cleanupFunc, err = testutil.GetDynamoDBInstance() if err != nil { log.Fatalf("Could not connect to Docker: %s", err) } - - testParams = &kvparams.DynamoDB{ - TableName: testutil.UniqueKVTableName(), - ScanLimit: 10, - Endpoint: databaseURI, - AwsRegion: "us-east-1", - AwsAccessKeyID: "fakeMyKeyId", - AwsSecretAccessKey: "fakeSecretAccessKey", - } - code := m.Run() cleanupFunc() os.Exit(code) diff --git a/pkg/kv/dynamodb/store.go b/pkg/kv/dynamodb/store.go index 45dfa76139e..fd857c6f4e5 100644 --- a/pkg/kv/dynamodb/store.go +++ b/pkg/kv/dynamodb/store.go @@ -380,6 +380,7 @@ func (s *Store) DropTable() error { func (e *EntriesIterator) SeekGE(key []byte) { if !e.isInRange(key) { e.startKey = key + e.exclusiveStartKey = nil e.runQuery() return } diff --git a/pkg/kv/dynamodb/store_test.go b/pkg/kv/dynamodb/store_test.go index e1f3aa00b8e..f61afc9daf2 100644 --- a/pkg/kv/dynamodb/store_test.go +++ b/pkg/kv/dynamodb/store_test.go @@ -1,13 +1,33 @@ package dynamodb_test import ( + "context" + "github.com/treeverse/lakefs/pkg/kv" + "github.com/treeverse/lakefs/pkg/kv/dynamodb" + "github.com/treeverse/lakefs/pkg/testutil" "testing" - "github.com/treeverse/lakefs/pkg/kv/dynamodb" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" ) func TestDynamoKV(t *testing.T) { - kvtest.DriverTest(t, dynamodb.DriverName, kvparams.Config{DynamoDB: testParams}) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + testParams = &kvparams.DynamoDB{ + TableName: testutil.UniqueKVTableName(), + ScanLimit: kvtest.MaxPageSize, + Endpoint: databaseURI, + AwsRegion: "us-east-1", + AwsAccessKeyID: "fakeMyKeyId", + AwsSecretAccessKey: "fakeSecretAccessKey", + } + + store, err := kv.Open(ctx, kvparams.Config{DynamoDB: testParams, Type: dynamodb.DriverName}) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", dynamodb.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) } diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index fac88722f9a..c3466c34c09 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -21,6 +21,9 @@ type StoreWithCounter struct { ScanCalls int64 } +// MaxPageSize is the maximum page size for pagination tests +const MaxPageSize = 10 + func NewStoreWithCounter(store kv.Store) *StoreWithCounter { return &StoreWithCounter{Store: store} } @@ -46,81 +49,96 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { } } - t.Run("listing all values of partition", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) - if itr == nil { - t.Fatalf("failed to create partition iterator") - } - defer itr.Close() - names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, []string{"a", "aa", "b", "c", "d"}); diffs != nil { - t.Fatalf("got wrong list of names: %v", diffs) - } - }) + t.Run("listing all values of partition", testPartitionIteratorListAll(ctx, store)) + t.Run("listing values SeekGE", listPartitionIteratorWithSeekGE(ctx, store)) + t.Run("count scans on successive SeekGE operations", testPartitionIteratorCountScansOnSeekGE(store, ctx)) + t.Run("failed SeekGE partition not found", testPartitionIteratorSeekGEOnPartitionNotFound(ctx, store)) + t.Run("SeekGE past end", testPartitionIteratorSeekGEPastEnd(ctx, store)) + t.Run("SeekGE seek back", testPartitionIteratorSeekGESeekBack(ctx, store)) + t.Run("listing values SeekGE after pagination", testPartitionIteratorSeekGEWithPagination(ctx, store)) +} - t.Run("listing values SeekGE", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) - if itr == nil { - t.Fatalf("failed to create partition iterator") - } - defer itr.Close() - for _, seekValue := range []string{"b", "aaa", "b"} { - itr.SeekGE([]byte(seekValue)) - names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { - t.Fatalf("got wrong list of names: %v", diffs) +func testPartitionIteratorSeekGEWithPagination(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + // load much more data + moreModelNames := []string{ + "da", "db", "dc", "dd", "de", "df", "dg", "dh", "di", "dj", + "dk", "dl", "dm", "dn", "do", "dp", "dq", "dr", "ds", "dt", + "du", "dv", "dw", "dx", "dy", "dz", + "z", + } + for _, name := range moreModelNames { + model := TestModel{Name: []byte(name)} + for _, partitionKey := range []string{firstPartitionKey, secondPartitionKey} { + err := kv.SetMsg(ctx, store, partitionKey, model.Name, &model) + if err != nil { + t.Fatalf("failed to set model (partition %s, name %s): %s", partitionKey, name, err) + } } } - }) - t.Run("count scans on successive SeekGE operations", func(t *testing.T) { - store := NewStoreWithCounter(store) itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() - for _, seekValue := range []string{"b", "c", "d"} { - itr.SeekGE([]byte(seekValue)) + + if !itr.Next() { + t.Fatal("expected Next to be true") + } + + itr.SeekGE([]byte("b")) + for i := 0; i < MaxPageSize+1; i++ { + // force pagination if !itr.Next() { - t.Fatalf("Expected iterator to have a value") - } - if err := itr.Err(); err != nil { - t.Fatalf("unexpected error: %v", err) - } - k := itr.Entry().Key - if string(k) != seekValue { - t.Fatalf("Expected to find value %s. Found %s", seekValue, k) + t.Fatal("expected Next to be true") } } - scanCalls := atomic.LoadInt64(&store.ScanCalls) - if scanCalls != 1 { - t.Fatalf("Expected exactly 1 call to Scan. got: %d", scanCalls) + + itr.SeekGE([]byte("z")) + if !itr.Next() { + t.Fatal("expected Next to be true") } - }) - t.Run("failed SeekGE partition not found", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), "", 0) + itr.SeekGE([]byte("d1")) + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, moreModelNames); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) + } + } +} + +func testPartitionIteratorSeekGESeekBack(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() - itr.SeekGE([]byte("d")) + itr.SeekGE([]byte("z")) if itr.Next() { - t.Fatalf("next after seekGE expected to be false") + t.Fatal("expected Next to be false") } - - itr.SeekGE([]byte("d")) - if itr.Next() { - t.Fatalf("next after seekGE expected to be false") + if err := itr.Err(); err != nil { + t.Fatalf("unexpected error: %s", err) } - - if err := itr.Err(); !errors.Is(err, kv.ErrMissingPartitionKey) { - t.Fatalf("expected error: %s, got %v", kv.ErrMissingPartitionKey, err) + itr.SeekGE([]byte("a")) + if !itr.Next() { + t.Fatalf("expected Next to be true") } - }) + if err := itr.Err(); err != nil { + t.Fatalf("unexpected error: %s", err) + } + e := itr.Entry() + model := e.Value.(*TestModel) + if string(model.Name) != "a" { + t.Fatalf("expected value a from iterator") + } + } +} - t.Run("SeekGE past end", func(t *testing.T) { +func testPartitionIteratorSeekGEPastEnd(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") @@ -142,34 +160,89 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { if err := itr.Err(); err != nil { t.Fatalf("unexpected error: %v", err) } - }) + } +} - t.Run("SeekGE seek back", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) +func testPartitionIteratorSeekGEOnPartitionNotFound(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), "", 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() - itr.SeekGE([]byte("z")) + itr.SeekGE([]byte("d")) if itr.Next() { - t.Fatal("expected Next to be false") + t.Fatalf("next after seekGE expected to be false") } - if err := itr.Err(); err != nil { - t.Fatalf("unexpected error: %s", err) + + itr.SeekGE([]byte("d")) + if itr.Next() { + t.Fatalf("next after seekGE expected to be false") } - itr.SeekGE([]byte("a")) - if !itr.Next() { - t.Fatalf("expected Next to be true") + + if err := itr.Err(); !errors.Is(err, kv.ErrMissingPartitionKey) { + t.Fatalf("expected error: %s, got %v", kv.ErrMissingPartitionKey, err) } - if err := itr.Err(); err != nil { - t.Fatalf("unexpected error: %s", err) + } +} + +func testPartitionIteratorCountScansOnSeekGE(store kv.Store, ctx context.Context) func(t *testing.T) { + return func(t *testing.T) { + store := NewStoreWithCounter(store) + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) + if itr == nil { + t.Fatalf("failed to create partition iterator") } - e := itr.Entry() - model := e.Value.(*TestModel) - if string(model.Name) != "a" { - t.Fatalf("expected value a from iterator") + defer itr.Close() + for _, seekValue := range []string{"b", "c", "d"} { + itr.SeekGE([]byte(seekValue)) + if !itr.Next() { + t.Fatalf("Expected iterator to have a value") + } + if err := itr.Err(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + k := itr.Entry().Key + if string(k) != seekValue { + t.Fatalf("Expected to find value %s. Found %s", seekValue, k) + } } - }) + scanCalls := atomic.LoadInt64(&store.ScanCalls) + if scanCalls != 1 { + t.Fatalf("Expected exactly 1 call to Scan. got: %d", scanCalls) + } + } +} + +func listPartitionIteratorWithSeekGE(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) + if itr == nil { + t.Fatalf("failed to create partition iterator") + } + defer itr.Close() + for _, seekValue := range []string{"b", "aaa", "b"} { + itr.SeekGE([]byte(seekValue)) + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) + } + } + } +} + +func testPartitionIteratorListAll(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) + if itr == nil { + t.Fatalf("failed to create partition iterator") + } + defer itr.Close() + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, []string{"a", "aa", "b", "c", "d"}); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) + } + } } // scanPartitionIterator scans the iterator and returns a slice of the results of applying fn to each model. diff --git a/pkg/kv/kvtest/store.go b/pkg/kv/kvtest/store.go index 37fcdb099a6..09ffba34e73 100644 --- a/pkg/kv/kvtest/store.go +++ b/pkg/kv/kvtest/store.go @@ -50,9 +50,8 @@ func sampleEntry(prefix string, n int) kv.Entry { return kv.Entry{Key: []byte(k), Value: []byte(v)} } -func DriverTest(t *testing.T, name string, params kvparams.Config) { +func DriverTest(t *testing.T, ms MakeStore) { t.Helper() - ms := MakeStoreByName(name, params) t.Run("Driver_Open", func(t *testing.T) { testDriverOpen(t, ms) }) t.Run("Store_SetGet", func(t *testing.T) { testStoreSetGet(t, ms) }) t.Run("Store_SetIf", func(t *testing.T) { testStoreSetIf(t, ms) }) @@ -457,19 +456,6 @@ func testStoreScan(t *testing.T, ms MakeStore) { }) } -func MakeStoreByName(name string, kvParams kvparams.Config) MakeStore { - return func(t testing.TB, ctx context.Context) kv.Store { - t.Helper() - kvParams.Type = name - store, err := kv.Open(ctx, kvParams) - if err != nil { - t.Fatalf("failed to open kv '%s' store: %s", name, err) - } - t.Cleanup(store.Close) - return store - } -} - func testStoreMissingArgument(t *testing.T, ms MakeStore) { ctx := context.Background() store := ms(t, ctx) diff --git a/pkg/kv/local/store_test.go b/pkg/kv/local/store_test.go index 29cd1f6aebc..79299032a35 100644 --- a/pkg/kv/local/store_test.go +++ b/pkg/kv/local/store_test.go @@ -1,19 +1,29 @@ package local_test import ( + "context" "testing" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" "github.com/treeverse/lakefs/pkg/kv/local" ) func TestLocalKV(t *testing.T) { - kvtest.DriverTest(t, local.DriverName, kvparams.Config{ - Type: local.DriverName, - Local: &kvparams.Local{ - Path: t.TempDir(), - EnableLogging: true, - }, + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{ + Type: local.DriverName, + Local: &kvparams.Local{ + Path: t.TempDir(), + EnableLogging: true, + }, + }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", local.DriverName, err) + } + t.Cleanup(store.Close) + return store }) } diff --git a/pkg/kv/mem/store_test.go b/pkg/kv/mem/store_test.go index 02b5e3ebf31..cda2128da76 100644 --- a/pkg/kv/mem/store_test.go +++ b/pkg/kv/mem/store_test.go @@ -1,13 +1,25 @@ package mem_test import ( + "context" "testing" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" "github.com/treeverse/lakefs/pkg/kv/mem" ) func TestMemKV(t *testing.T) { - kvtest.DriverTest(t, mem.DriverName, kvparams.Config{}) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{ + Type: mem.DriverName, + }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", mem.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) } diff --git a/pkg/kv/msg_test.go b/pkg/kv/msg_test.go index 0e3d2fb5d0c..ccd6f9324fe 100644 --- a/pkg/kv/msg_test.go +++ b/pkg/kv/msg_test.go @@ -146,7 +146,7 @@ func BenchmarkDrivers(b *testing.B) { defer closer() dynamoStore := testutil.GetDynamoDBProd(ctx, b) - postgresStore := kvtest.MakeStoreByName(postgres.DriverName, kvparams.Config{Postgres: &kvparams.Postgres{ConnectionString: databaseURI}})(b, ctx) + postgresStore, err := kv.Open(ctx, kvparams.Config{Type: postgres.DriverName, Postgres: &kvparams.Postgres{ConnectionString: databaseURI}}) defer postgresStore.Close() tests := []struct { diff --git a/pkg/kv/postgres/main_test.go b/pkg/kv/postgres/main_test.go index 57cc8525c6c..d441a390064 100644 --- a/pkg/kv/postgres/main_test.go +++ b/pkg/kv/postgres/main_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/url" "os" "testing" @@ -17,16 +18,15 @@ const ( ) var ( - pool *dockertest.Pool - databaseURI string + pool *dockertest.Pool ) -func runDBInstance(dockerPool *dockertest.Pool) (string, func()) { +func runDBInstance(dockerPool *dockertest.Pool, dbName string) (string, func()) { ctx := context.Background() resource, err := dockerPool.Run("postgres", "11", []string{ "POSTGRES_USER=lakefs", "POSTGRES_PASSWORD=lakefs", - "POSTGRES_DB=lakefs_db", + "POSTGRES_DB=" + dbName, }) if err != nil { panic("Could not start postgresql: " + err.Error()) @@ -49,7 +49,7 @@ func runDBInstance(dockerPool *dockertest.Pool) (string, func()) { // create connection var pgPool *pgxpool.Pool port := resource.GetPort("5432/tcp") - uri := fmt.Sprintf("postgres://lakefs:lakefs@localhost:%s/lakefs_db?sslmode=disable", port) + uri := fmt.Sprintf("postgres://lakefs:lakefs@localhost:%s/%s?sslmode=disable", port, url.PathEscape(dbName)) err = dockerPool.Retry(func() error { var err error pgPool, err = pgxpool.New(ctx, uri) @@ -73,9 +73,6 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("Could not connect to Docker: %s", err) } - var cleanup func() - databaseURI, cleanup = runDBInstance(pool) code := m.Run() - cleanup() os.Exit(code) } diff --git a/pkg/kv/postgres/store_test.go b/pkg/kv/postgres/store_test.go index 93d2f10e224..b4628cfe370 100644 --- a/pkg/kv/postgres/store_test.go +++ b/pkg/kv/postgres/store_test.go @@ -1,13 +1,48 @@ package postgres_test import ( + "context" + "fmt" + "net/url" "testing" + "github.com/jackc/pgx/v5" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" "github.com/treeverse/lakefs/pkg/kv/postgres" + "github.com/treeverse/lakefs/pkg/testutil" ) func TestPostgresKV(t *testing.T) { - kvtest.DriverTest(t, postgres.DriverName, kvparams.Config{Postgres: &kvparams.Postgres{ConnectionString: databaseURI, ScanPageSize: 10}}) + databaseURI, cleanup := runDBInstance(pool, testutil.UniqueKVTableName()) + t.Cleanup(cleanup) + + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + + conn, err := pgx.Connect(ctx, databaseURI) + if err != nil { + t.Fatalf("Unable to connect to database: %v", err) + } + defer conn.Close(ctx) + + // create a new schema per test + schemaName := "test_schema" + testutil.UniqueName() + _, err = conn.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+url.PathEscape(schemaName)) + _, err = conn.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)) + if err != nil { + t.Fatalf("Error creating schema '%s': %s", schemaName, err) + } + + store, err := kv.Open(ctx, kvparams.Config{ + Type: postgres.DriverName, + Postgres: &kvparams.Postgres{ConnectionString: fmt.Sprintf("%s&search_path=%s", databaseURI, url.PathEscape(schemaName)), ScanPageSize: kvtest.MaxPageSize}, + }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", postgres.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) } diff --git a/pkg/testutil/dynamodb.go b/pkg/testutil/dynamodb.go index 5e99e053bd6..670b9657af3 100644 --- a/pkg/testutil/dynamodb.go +++ b/pkg/testutil/dynamodb.go @@ -78,7 +78,11 @@ func GetDynamoDBInstance() (string, func(), error) { } func UniqueKVTableName() string { - return "kvstore_" + nanoid.MustGenerate(chars, charsSize) + return "kvstore_" + UniqueName() +} + +func UniqueName() string { + return nanoid.MustGenerate(chars, charsSize) } func GetDynamoDBProd(ctx context.Context, tb testing.TB) kv.Store {