From 7b94bc218e63905e0183499ec61ea2d4b5b68c42 Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Wed, 22 Apr 2026 11:13:43 +0200 Subject: [PATCH 1/6] [store] Squash datastore.Store and datastore.Datastore --- cmds/db-manager/migration/migrate.go | 4 +- pkg/aux_/store/datastore/store.go | 4 +- pkg/aux_/store/datastore/store_test.go | 5 +- pkg/datastore/datastore.go | 215 ---------------------- pkg/datastore/store.go | 226 ++++++++++++++++++++++-- pkg/rid/application/application_test.go | 4 +- pkg/rid/store/datastore/store.go | 4 +- pkg/rid/store/datastore/store_test.go | 8 +- pkg/scd/store/datastore/store.go | 4 +- pkg/scd/store/datastore/store_test.go | 5 +- 10 files changed, 227 insertions(+), 252 deletions(-) delete mode 100644 pkg/datastore/datastore.go diff --git a/cmds/db-manager/migration/migrate.go b/cmds/db-manager/migration/migrate.go index 086f022b1..11c6acc1c 100644 --- a/cmds/db-manager/migration/migrate.go +++ b/cmds/db-manager/migration/migrate.go @@ -225,11 +225,11 @@ func migrate(cmd *cobra.Command, _ []string) error { return nil } -func connectTo(ctx context.Context, dbName string) (*datastore.Datastore, error) { +func connectTo(ctx context.Context, dbName string) (*datastore.Store[any], error) { // Connect to database server connectParameters := params.GetConnectParameters() connectParameters.DBName = dbName - return datastore.Dial(ctx, connectParameters) + return datastore.Dial[any](ctx, connectParameters) } func enumerateMigrationSteps(path *string) ([]MigrationStep, error) { diff --git a/pkg/aux_/store/datastore/store.go b/pkg/aux_/store/datastore/store.go index 616513853..0731c6368 100644 --- a/pkg/aux_/store/datastore/store.go +++ b/pkg/aux_/store/datastore/store.go @@ -31,7 +31,7 @@ type Store struct { datastore.Store[repos.Repository] } -func newStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger) (*Store, error) { +func newStore(ctx context.Context, db *datastore.Store[repos.Repository], logger *zap.Logger) (*Store, error) { s := &Store{} @@ -52,7 +52,7 @@ func newStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger) func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*Store, error) { - store, err := datastore.DialStore(ctx, "aux", withCheckCron, func(db *datastore.Datastore) (*Store, error) { + store, err := datastore.DialStore(ctx, "aux", withCheckCron, func(db *datastore.Store[repos.Repository]) (*Store, error) { return newStore(ctx, db, logger) }) diff --git a/pkg/aux_/store/datastore/store_test.go b/pkg/aux_/store/datastore/store_test.go index 3bd3f7b7a..1711dc434 100644 --- a/pkg/aux_/store/datastore/store_test.go +++ b/pkg/aux_/store/datastore/store_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/interuss/dss/pkg/aux_/repos" "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" @@ -33,7 +34,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { } func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) { - db, err := datastore.Dial(ctx, connectParameters) + db, err := datastore.Dial[repos.Repository](ctx, connectParameters) require.NoError(t, err) s, err := newStore(ctx, db, logging.Logger) @@ -51,6 +52,6 @@ func cleanUp(ctx context.Context, s *Store) error { DELETE FROM dss_metadata WHERE locality IS NOT NULL; ` - _, err := s.DB.Pool.Exec(ctx, query) + _, err := s.Pool.Exec(ctx, query) return err } diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go deleted file mode 100644 index a52f4dca0..000000000 --- a/pkg/datastore/datastore.go +++ /dev/null @@ -1,215 +0,0 @@ -package datastore - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/coreos/go-semver/semver" - "github.com/exaring/otelpgx" - "github.com/interuss/dss/pkg/datastore/params" - "github.com/interuss/dss/pkg/logging" - "github.com/interuss/stacktrace" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/robfig/cron/v3" - "go.uber.org/zap" -) - -// datastore.Datastore is information about database that a datastore.Store is using. -// (just a subset of datastore.Store fields with methods that only need those fields) -type Datastore struct { - Version *Version - Pool *pgxpool.Pool -} - -const ( - CodeRetryable = stacktrace.ErrorCode(1) -) - -var UnknownVersion = &semver.Version{} - -func checkDatabase(ctx context.Context, db *Datastore, databaseName string) { - logger := logging.WithValuesFromContext(ctx, logging.Logger) - statsPtr := db.Pool.Stat() - if int(statsPtr.TotalConns()) == 0 { - logger.Warn("Failed periodic DB Ping (TotalConns=0)", zap.String("Database", databaseName)) - } else { - logger.Info("Successful periodic DB Ping", zap.String("Database", databaseName)) - } -} - -func DialStore[S any](ctx context.Context, dbName string, withCheckCron bool, newStore func(*Datastore) (S, error)) (S, error) { - - var zero S - - cp := params.GetConnectParameters() - cp.DBName = dbName - - db, err := Dial(ctx, cp) - - if err != nil { - if strings.Contains(err.Error(), "connect: connection refused") { - return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to datastore server for %s", dbName) - } - return zero, stacktrace.Propagate(err, "Failed to connect to %s database", dbName) - } - s, err := newStore(db) - if err != nil { - db.Pool.Close() - if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), fmt.Sprintf("database \"%s\" does not exist", dbName)) || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") { - return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to create %s store", dbName) - } - return zero, stacktrace.Propagate(err, "Failed to create %s store", dbName) - } - - if withCheckCron { - c := cron.New() - if _, err := c.AddFunc("@every 1m", func() { checkDatabase(ctx, db, dbName) }); err != nil { - db.Pool.Close() - return zero, stacktrace.Propagate(err, "Failed to schedule db check for %s", dbName) - } - c.Start() - - go func() { - <-ctx.Done() - c.Stop() - }() - } - - return s, nil -} - -func Dial(ctx context.Context, connParams params.ConnectParameters) (*Datastore, error) { - dsn, err := connParams.BuildDSN() - if err != nil { - return nil, stacktrace.Propagate(err, "Failed to create connection config for pgx") - } - - config, err := pgxpool.ParseConfig(dsn) - if err != nil { - return nil, stacktrace.Propagate(err, "Failed to parse connection config for pgx") - } - - if connParams.SSL.Mode == "enable" { - config.ConnConfig.TLSConfig.ServerName = connParams.Host - } - config.MaxConns = int32(connParams.MaxOpenConns) - config.MaxConnIdleTime = (time.Duration(connParams.MaxConnIdleSeconds) * time.Second) - config.HealthCheckPeriod = (1 * time.Second) - config.MinConns = 1 - - config.ConnConfig.Tracer = otelpgx.NewTracer() - - dbPool, err := pgxpool.NewWithConfig(ctx, config) - if err != nil { - return nil, err - } - - ds, err := initDatastore(ctx, dbPool) - if err != nil { - return nil, stacktrace.Propagate(err, "Failed to connect to datastore") - } - return ds, nil -} - -func initDatastore(ctx context.Context, pool *pgxpool.Pool) (*Datastore, error) { - version, err := fetchVersion(ctx, pool) - if err != nil { - return nil, err - } - - if version.Type == CockroachDB { - return &Datastore{Version: version, Pool: pool}, nil - } - if version.Type == Yugabyte { - return &Datastore{Version: version, Pool: pool}, nil - } - return nil, stacktrace.NewError("%s is not implemented yet", version.Type) -} - -func fetchVersion(ctx context.Context, pool *pgxpool.Pool) (*Version, error) { - const versionDbQuery = ` - SELECT version(); - ` - var fullVersion string - err := pool.QueryRow(ctx, versionDbQuery).Scan(&fullVersion) - if err != nil { - return nil, stacktrace.Propagate(err, "Error querying datastore version") - } - - return NewVersion(fullVersion) -} - -func (ds *Datastore) CreateDatabase(ctx context.Context, dbName string) error { - createDB := fmt.Sprintf("CREATE DATABASE %s", dbName) - if _, err := ds.Pool.Exec(ctx, createDB); err != nil { - return stacktrace.Propagate(err, "failed to create new database %s", dbName) - } - return nil -} - -func (ds *Datastore) DatabaseExists(ctx context.Context, dbName string) (bool, error) { - const checkDbQuery = ` - SELECT EXISTS ( - SELECT * FROM pg_database WHERE datname = $1 - )` - - var exists bool - if err := ds.Pool.QueryRow(ctx, checkDbQuery, dbName).Scan(&exists); err != nil { - return false, stacktrace.Propagate(err, "Error checking %s database existence", dbName) - } - - return exists, nil -} - -// GetSchemaVersion returns the Schema Version of the requested DB Name -func (ds *Datastore) GetSchemaVersion(ctx context.Context, dbName string) (*semver.Version, error) { - if dbName == "" { - return nil, stacktrace.NewError("GetSchemaVersion was provided with an empty database name") - } - if ds.Version.Type == Yugabyte && dbName != ds.Pool.Config().ConnConfig.Database { - return nil, stacktrace.NewError("Yugabyte do not support switching databases with the same connection. Unable to retrieve schema version for database %s while connected to %s.", dbName, ds.Pool.Config().ConnConfig.Database) - } - - var ( - checkTableQuery = fmt.Sprintf(` - SELECT EXISTS ( - SELECT - * - FROM - %s.information_schema.tables - WHERE - table_name = 'schema_versions' - AND - table_catalog = $1 - )`, dbName) - exists bool - getVersionQuery = ` - SELECT - schema_version - FROM - schema_versions - WHERE - onerow_enforcer = TRUE` - ) - - if err := ds.Pool.QueryRow(ctx, checkTableQuery, dbName).Scan(&exists); err != nil { - return nil, stacktrace.Propagate(err, "Error scanning table listing row") - } - - if !exists { - // Database has not been bootstrapped using DB Schema Manager - return UnknownVersion, nil - } - - var dbVersion string - if err := ds.Pool.QueryRow(ctx, getVersionQuery).Scan(&dbVersion); err != nil { - return nil, stacktrace.Propagate(err, "Error scanning version row") - } - if len(dbVersion) > 0 && dbVersion[0] == 'v' { - dbVersion = dbVersion[1:] - } - - return semver.NewVersion(dbVersion) -} diff --git a/pkg/datastore/store.go b/pkg/datastore/store.go index fea41de8e..ad5a100e2 100644 --- a/pkg/datastore/store.go +++ b/pkg/datastore/store.go @@ -2,26 +2,43 @@ package datastore import ( "context" + "fmt" + "strings" + "time" "github.com/cockroachdb/cockroach-go/v2/crdb" crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/coreos/go-semver/semver" + "github.com/exaring/otelpgx" + "github.com/interuss/dss/pkg/datastore/params" + "github.com/interuss/dss/pkg/logging" dsssql "github.com/interuss/dss/pkg/sql" "github.com/interuss/stacktrace" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/jonboulle/clockwork" + "github.com/robfig/cron/v3" + "go.uber.org/zap" ) -// datastore.Store is a partial implementation of store.Store when the data backing is a database. +const ( + CodeRetryable = stacktrace.ErrorCode(1) +) + +var UnknownVersion = &semver.Version{} + +// Store is a partial implementation of store.Store when the data backing is a database. +// It also carries the database connection (Pool) and its Version. type Store[R any] struct { - DB *Datastore - Clock clockwork.Clock - version *semver.Version - newRepo func(dsssql.Queryable) R - maxRetries int + Pool *pgxpool.Pool + Version *Version + Clock clockwork.Clock + schemaVersion *semver.Version + newRepo func(dsssql.Queryable) R + maxRetries int } -func NewStore[R any](ctx context.Context, db *Datastore, maxRetries int, crdbExpected int64, ybExpected int64, newRepo func(dsssql.Queryable) R) (Store[R], error) { +func NewStore[R any](ctx context.Context, db *Store[R], maxRetries int, crdbExpected int64, ybExpected int64, newRepo func(dsssql.Queryable) R) (Store[R], error) { dbName := db.Pool.Config().ConnConfig.Database @@ -30,33 +47,32 @@ func NewStore[R any](ctx context.Context, db *Datastore, maxRetries int, crdbExp return Store[R]{}, stacktrace.Propagate(err, "Failed to get schema version for %s", dbName) } - err = checkMajorSchemaVersion(ctx, db, vs, crdbExpected, ybExpected) - - if err != nil { + if err := checkMajorSchemaVersion(ctx, db, vs, crdbExpected, ybExpected); err != nil { return Store[R]{}, err } return Store[R]{ - DB: db, - Clock: clockwork.NewRealClock(), - version: vs, - newRepo: newRepo, - maxRetries: maxRetries, + Pool: db.Pool, + Version: db.Version, + Clock: clockwork.NewRealClock(), + schemaVersion: vs, + newRepo: newRepo, + maxRetries: maxRetries, }, nil } func (s *Store[R]) Interact(_ context.Context) (R, error) { - return s.newRepo(s.DB.Pool), nil + return s.newRepo(s.Pool), nil } func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { ctx = crdb.WithMaxRetries(ctx, s.maxRetries) - return crdbpgx.ExecuteTx(ctx, s.DB.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { + return crdbpgx.ExecuteTx(ctx, s.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { return f(ctx, s.newRepo(tx)) }) } -func checkMajorSchemaVersion(ctx context.Context, db *Datastore, vs *semver.Version, crdbExpected int64, ybExpected int64) error { +func checkMajorSchemaVersion[R any](ctx context.Context, db *Store[R], vs *semver.Version, crdbExpected int64, ybExpected int64) error { if vs == UnknownVersion { return stacktrace.NewError("%s has not been bootstrapped with Schema Manager, please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", db.Pool.Config().ConnConfig.Database) } @@ -70,6 +86,178 @@ func checkMajorSchemaVersion(ctx context.Context, db *Datastore, vs *semver.Vers } func (s *Store[R]) Close() error { - s.DB.Pool.Close() + s.Pool.Close() return nil } + +func checkDatabase[R any](ctx context.Context, db *Store[R], databaseName string) { + logger := logging.WithValuesFromContext(ctx, logging.Logger) + statsPtr := db.Pool.Stat() + if int(statsPtr.TotalConns()) == 0 { + logger.Warn("Failed periodic DB Ping (TotalConns=0)", zap.String("Database", databaseName)) + } else { + logger.Info("Successful periodic DB Ping", zap.String("Database", databaseName)) + } +} + +func DialStore[R any, S any](ctx context.Context, dbName string, withCheckCron bool, newStore func(*Store[R]) (S, error)) (S, error) { + + var zero S + + cp := params.GetConnectParameters() + cp.DBName = dbName + + db, err := Dial[R](ctx, cp) + + if err != nil { + if strings.Contains(err.Error(), "connect: connection refused") { + return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to datastore server for %s", dbName) + } + return zero, stacktrace.Propagate(err, "Failed to connect to %s database", dbName) + } + s, err := newStore(db) + if err != nil { + db.Pool.Close() + if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), fmt.Sprintf("database \"%s\" does not exist", dbName)) || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") { + return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to create %s store", dbName) + } + return zero, stacktrace.Propagate(err, "Failed to create %s store", dbName) + } + + if withCheckCron { + c := cron.New() + if _, err := c.AddFunc("@every 1m", func() { checkDatabase(ctx, db, dbName) }); err != nil { + db.Pool.Close() + return zero, stacktrace.Propagate(err, "Failed to schedule db check for %s", dbName) + } + c.Start() + + go func() { + <-ctx.Done() + c.Stop() + }() + } + + return s, nil +} + +func Dial[R any](ctx context.Context, connParams params.ConnectParameters) (*Store[R], error) { + dsn, err := connParams.BuildDSN() + if err != nil { + return nil, stacktrace.Propagate(err, "Failed to create connection config for pgx") + } + + config, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, stacktrace.Propagate(err, "Failed to parse connection config for pgx") + } + + if connParams.SSL.Mode == "enable" { + config.ConnConfig.TLSConfig.ServerName = connParams.Host + } + config.MaxConns = int32(connParams.MaxOpenConns) + config.MaxConnIdleTime = (time.Duration(connParams.MaxConnIdleSeconds) * time.Second) + config.HealthCheckPeriod = (1 * time.Second) + config.MinConns = 1 + + config.ConnConfig.Tracer = otelpgx.NewTracer() + + dbPool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return nil, err + } + + const versionDbQuery = ` + SELECT version(); + ` + var fullVersion string + err = dbPool.QueryRow(ctx, versionDbQuery).Scan(&fullVersion) + if err != nil { + return nil, stacktrace.Propagate(err, "Error querying datastore version") + } + + version, err := NewVersion(fullVersion) + + if err != nil { + return nil, stacktrace.Propagate(err, "Error querying datastore version") + } + + if version.Type == CockroachDB || version.Type == Yugabyte { + return &Store[R]{Version: version, Pool: dbPool}, nil + } + + return nil, stacktrace.NewError("%s is not implemented yet", version.Type) +} + +func (s *Store[R]) CreateDatabase(ctx context.Context, dbName string) error { + createDB := fmt.Sprintf("CREATE DATABASE %s", dbName) + if _, err := s.Pool.Exec(ctx, createDB); err != nil { + return stacktrace.Propagate(err, "failed to create new database %s", dbName) + } + return nil +} + +func (s *Store[R]) DatabaseExists(ctx context.Context, dbName string) (bool, error) { + const checkDbQuery = ` + SELECT EXISTS ( + SELECT * FROM pg_database WHERE datname = $1 + )` + + var exists bool + if err := s.Pool.QueryRow(ctx, checkDbQuery, dbName).Scan(&exists); err != nil { + return false, stacktrace.Propagate(err, "Error checking %s database existence", dbName) + } + + return exists, nil +} + +// GetSchemaVersion returns the Schema Version of the requested DB Name +func (s *Store[R]) GetSchemaVersion(ctx context.Context, dbName string) (*semver.Version, error) { + if dbName == "" { + return nil, stacktrace.NewError("GetSchemaVersion was provided with an empty database name") + } + if s.Version.Type == Yugabyte && dbName != s.Pool.Config().ConnConfig.Database { + return nil, stacktrace.NewError("Yugabyte do not support switching databases with the same connection. Unable to retrieve schema version for database %s while connected to %s.", dbName, s.Pool.Config().ConnConfig.Database) + } + + var ( + checkTableQuery = fmt.Sprintf(` + SELECT EXISTS ( + SELECT + * + FROM + %s.information_schema.tables + WHERE + table_name = 'schema_versions' + AND + table_catalog = $1 + )`, dbName) + exists bool + getVersionQuery = ` + SELECT + schema_version + FROM + schema_versions + WHERE + onerow_enforcer = TRUE` + ) + + if err := s.Pool.QueryRow(ctx, checkTableQuery, dbName).Scan(&exists); err != nil { + return nil, stacktrace.Propagate(err, "Error scanning table listing row") + } + + if !exists { + // Database has not been bootstrapped using DB Schema Manager + return UnknownVersion, nil + } + + var dbVersion string + if err := s.Pool.QueryRow(ctx, getVersionQuery).Scan(&dbVersion); err != nil { + return nil, stacktrace.Propagate(err, "Error scanning version row") + } + if len(dbVersion) > 0 && dbVersion[0] == 'v' { + dbVersion = dbVersion[1:] + } + + return semver.NewVersion(dbVersion) +} diff --git a/pkg/rid/application/application_test.go b/pkg/rid/application/application_test.go index 2b0f06695..064558a3c 100644 --- a/pkg/rid/application/application_test.go +++ b/pkg/rid/application/application_test.go @@ -61,7 +61,7 @@ func setUpStore(ctx context.Context, t *testing.T, logger *zap.Logger) (store.St connectParameters.DBName = "rid" - ridDatastore, err := datastore.Dial(ctx, connectParameters) + ridDatastore, err := datastore.Dial[repos.Repository](ctx, connectParameters) require.NoError(t, err) logger.Info("using datastore.") @@ -82,7 +82,7 @@ func cleanUp(ctx context.Context, s *ridc.Store) error { DELETE FROM subscriptions WHERE id IS NOT NULL; DELETE FROM identification_service_areas WHERE id IS NOT NULL;` - _, err := s.DB.Pool.Exec(ctx, query) + _, err := s.Pool.Exec(ctx, query) return err } diff --git a/pkg/rid/store/datastore/store.go b/pkg/rid/store/datastore/store.go index ef5ee204a..331375445 100644 --- a/pkg/rid/store/datastore/store.go +++ b/pkg/rid/store/datastore/store.go @@ -33,7 +33,7 @@ type Store struct { datastore.Store[repos.Repository] } -func NewStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger) (*Store, error) { +func NewStore(ctx context.Context, db *datastore.Store[repos.Repository], logger *zap.Logger) (*Store, error) { s := &Store{} @@ -53,7 +53,7 @@ func NewStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger) func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*Store, error) { - store, err := datastore.DialStore(ctx, "rid", withCheckCron, func(db *datastore.Datastore) (*Store, error) { + store, err := datastore.DialStore(ctx, "rid", withCheckCron, func(db *datastore.Store[repos.Repository]) (*Store, error) { return NewStore(ctx, db, logger) }) diff --git a/pkg/rid/store/datastore/store_test.go b/pkg/rid/store/datastore/store_test.go index be4d27113..3566cc0ac 100644 --- a/pkg/rid/store/datastore/store_test.go +++ b/pkg/rid/store/datastore/store_test.go @@ -43,7 +43,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { } func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) { - db, err := datastore.Dial(ctx, connectParameters) + db, err := datastore.Dial[repos.Repository](ctx, connectParameters) require.NoError(t, err) s, err := NewStore(ctx, db, logging.Logger) @@ -61,7 +61,7 @@ func cleanUp(ctx context.Context, s *Store) error { DELETE FROM subscriptions WHERE id IS NOT NULL; DELETE FROM identification_service_areas WHERE id IS NOT NULL;` - _, err := s.DB.Pool.Exec(ctx, query) + _, err := s.Pool.Exec(ctx, query) return err } @@ -201,7 +201,7 @@ func TestBasicTxn(t *testing.T) { subscription1 := subscriptionsPool[0].input subscription2 := subscriptionsPool[1].input - tx1, err := store.DB.Pool.Begin(ctx) + tx1, err := store.Pool.Begin(ctx) require.NoError(t, err) s1 := &repo{ Queryable: tx1, @@ -209,7 +209,7 @@ func TestBasicTxn(t *testing.T) { clock: clockwork.NewRealClock(), } - tx2, err := store.DB.Pool.Begin(ctx) + tx2, err := store.Pool.Begin(ctx) require.NoError(t, err) s2 := &repo{ diff --git a/pkg/scd/store/datastore/store.go b/pkg/scd/store/datastore/store.go index bf68f5c32..20ea5a794 100644 --- a/pkg/scd/store/datastore/store.go +++ b/pkg/scd/store/datastore/store.go @@ -34,7 +34,7 @@ type Store struct { datastore.Store[repos.Repository] } -func newStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger, globalLock bool) (*Store, error) { +func newStore(ctx context.Context, db *datastore.Store[repos.Repository], logger *zap.Logger, globalLock bool) (*Store, error) { s := &Store{} @@ -55,7 +55,7 @@ func newStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger, func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (*Store, error) { - store, err := datastore.DialStore(ctx, "scd", withCheckCron, func(db *datastore.Datastore) (*Store, error) { + store, err := datastore.DialStore(ctx, "scd", withCheckCron, func(db *datastore.Store[repos.Repository]) (*Store, error) { return newStore(ctx, db, logger, globalLock) }) diff --git a/pkg/scd/store/datastore/store_test.go b/pkg/scd/store/datastore/store_test.go index 89c02c246..850a0c1fa 100644 --- a/pkg/scd/store/datastore/store_test.go +++ b/pkg/scd/store/datastore/store_test.go @@ -7,6 +7,7 @@ import ( "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" + "github.com/interuss/dss/pkg/scd/repos" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -33,7 +34,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { } func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) { - db, err := datastore.Dial(ctx, connectParameters) + db, err := datastore.Dial[repos.Repository](ctx, connectParameters) require.NoError(t, err) s, err := newStore(ctx, db, logging.Logger, false) @@ -53,6 +54,6 @@ func cleanUp(ctx context.Context, s *Store) error { DELETE FROM scd_constraints WHERE id IS NOT NULL; DELETE FROM scd_uss_availability WHERE id IS NOT NULL;` - _, err := s.DB.Pool.Exec(ctx, query) + _, err := s.Pool.Exec(ctx, query) return err } From 20eeb18f1ac6005ba452e2f85b4dd00f4656f106 Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Wed, 22 Apr 2026 14:10:52 +0200 Subject: [PATCH 2/6] [store] Revork init structure; use Store interfaces instead of datastore.Store interfaces --- cmds/core-service/main.go | 12 +- cmds/db-manager/cleanup/evict.go | 8 +- pkg/aux_/store/datastore/store.go | 47 +++----- pkg/aux_/store/datastore/store_test.go | 10 +- pkg/aux_/store/store.go | 9 ++ pkg/datastore/store.go | 149 +++++++++++------------- pkg/rid/application/application_test.go | 7 +- pkg/rid/store/datastore/store.go | 47 +++----- pkg/rid/store/datastore/store_test.go | 10 +- pkg/rid/store/store.go | 9 ++ pkg/scd/store/datastore/store.go | 49 +++----- pkg/scd/store/datastore/store_test.go | 10 +- pkg/scd/store/store.go | 9 ++ 13 files changed, 168 insertions(+), 208 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index d81210c8e..fa5f83e95 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -21,16 +21,16 @@ import ( apiversioningv1 "github.com/interuss/dss/pkg/api/versioningv1" "github.com/interuss/dss/pkg/auth" aux "github.com/interuss/dss/pkg/aux_" - auxc "github.com/interuss/dss/pkg/aux_/store/datastore" + auxs "github.com/interuss/dss/pkg/aux_/store" "github.com/interuss/dss/pkg/build" "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/rid/application" rid_v1 "github.com/interuss/dss/pkg/rid/server/v1" rid_v2 "github.com/interuss/dss/pkg/rid/server/v2" - ridc "github.com/interuss/dss/pkg/rid/store/datastore" + rids "github.com/interuss/dss/pkg/rid/store" "github.com/interuss/dss/pkg/scd" - scdc "github.com/interuss/dss/pkg/scd/store/datastore" + scds "github.com/interuss/dss/pkg/scd/store" "github.com/interuss/dss/pkg/version" "github.com/interuss/dss/pkg/versioning" "github.com/interuss/stacktrace" @@ -84,7 +84,7 @@ func createKeyResolver() (auth.KeyResolver, error) { } func createAuxServer(ctx context.Context, locality string, publicEndpoint string, scdGlobalLock bool, logger *zap.Logger) (*aux.Server, error) { - auxStore, err := auxc.Dial(ctx, logger, true) + auxStore, err := auxs.Init(ctx, logger, true) if err != nil { return nil, err } @@ -105,7 +105,7 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) (*rid_v1.Server, *rid_v2.Server, error) { - ridStore, err := ridc.Dial(ctx, logger, true) + ridStore, err := rids.Init(ctx, logger, true) if err != nil { return nil, nil, err } @@ -129,7 +129,7 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, error) { - scdStore, err := scdc.Dial(ctx, logger, true, *scdGlobalLock) + scdStore, err := scds.Init(ctx, logger, true, *scdGlobalLock) if err != nil { return nil, err } diff --git a/cmds/db-manager/cleanup/evict.go b/cmds/db-manager/cleanup/evict.go index ac2839bbc..9584ef620 100644 --- a/cmds/db-manager/cleanup/evict.go +++ b/cmds/db-manager/cleanup/evict.go @@ -10,10 +10,10 @@ import ( dssmodels "github.com/interuss/dss/pkg/models" ridmodels "github.com/interuss/dss/pkg/rid/models" ridrepos "github.com/interuss/dss/pkg/rid/repos" - ridc "github.com/interuss/dss/pkg/rid/store/datastore" + rids "github.com/interuss/dss/pkg/rid/store" scdmodels "github.com/interuss/dss/pkg/scd/models" scdrepos "github.com/interuss/dss/pkg/scd/repos" - scdc "github.com/interuss/dss/pkg/scd/store/datastore" + scds "github.com/interuss/dss/pkg/scd/store" "github.com/interuss/stacktrace" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -54,12 +54,12 @@ func evict(cmd *cobra.Command, _ []string) error { logger := logging.WithValuesFromContext(ctx, logging.Logger) - scdStore, err := scdc.Dial(ctx, logger, false, false) + scdStore, err := scds.Init(ctx, logger, false, false) if err != nil { return err } - ridStore, err := ridc.Dial(ctx, logger, false) + ridStore, err := rids.Init(ctx, logger, false) if err != nil { return err } diff --git a/pkg/aux_/store/datastore/store.go b/pkg/aux_/store/datastore/store.go index 0731c6368..17eb07480 100644 --- a/pkg/aux_/store/datastore/store.go +++ b/pkg/aux_/store/datastore/store.go @@ -5,7 +5,6 @@ import ( "github.com/interuss/dss/pkg/aux_/repos" "github.com/interuss/dss/pkg/datastore" - "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" dssql "github.com/interuss/dss/pkg/sql" "github.com/jonboulle/clockwork" @@ -25,36 +24,20 @@ type repo struct { version *datastore.Version } -// aux_.store.datastore.Store is a concrete store.Store[aux_.repos.Repository] providing the +// Init initializes the SQL-backed rid store. It return a concrete datastore.Store[aux_.repos.Repository] providing the // ability to interact with a database-backed store of aux information. -type Store struct { - datastore.Store[repos.Repository] -} - -func newStore(ctx context.Context, db *datastore.Store[repos.Repository], logger *zap.Logger) (*Store, error) { - - s := &Store{} - - base, err := datastore.NewStore(ctx, db, params.GetConnectParameters().MaxRetries, currentCrdbMajorSchemaVersion, currentYugabyteMajorSchemaVersion, func(q dssql.Queryable) repos.Repository { - return &repo{ - Queryable: q, - clock: s.Clock, - logger: logging.WithValuesFromContext(ctx, logger), - version: db.Version, - } - }) - if err != nil { - return nil, err - } - s.Store = base - return s, nil -} - -func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*Store, error) { - - store, err := datastore.DialStore(ctx, "aux", withCheckCron, func(db *datastore.Store[repos.Repository]) (*Store, error) { - return newStore(ctx, db, logger) - }) - - return store, err +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*datastore.Store[repos.Repository], error) { + return datastore.Init(ctx, datastore.Config[repos.Repository]{ + DBName: "aux", + CrdbMajorSchemaVersion: currentCrdbMajorSchemaVersion, + YbMajorSchemaVersion: currentYugabyteMajorSchemaVersion, + NewRepo: func(q dssql.Queryable, clock clockwork.Clock, version *datastore.Version) repos.Repository { + return &repo{ + Queryable: q, + clock: clock, + logger: logging.WithValuesFromContext(ctx, logger), + version: version, + } + }, + }, withCheckCron) } diff --git a/pkg/aux_/store/datastore/store_test.go b/pkg/aux_/store/datastore/store_test.go index 1711dc434..62513a3f4 100644 --- a/pkg/aux_/store/datastore/store_test.go +++ b/pkg/aux_/store/datastore/store_test.go @@ -16,7 +16,7 @@ var ( fakeClock = clockwork.NewFakeClock() ) -func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { +func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repository], func()) { connectParameters := params.GetConnectParameters() if connectParameters.Host == "" || connectParameters.Port == 0 { t.Skip() @@ -33,11 +33,9 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { } } -func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) { - db, err := datastore.Dial[repos.Repository](ctx, connectParameters) - require.NoError(t, err) +func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*datastore.Store[repos.Repository], error) { + s, err := Init(ctx, logging.Logger, false) - s, err := newStore(ctx, db, logging.Logger) if err != nil { return nil, err } @@ -47,7 +45,7 @@ func newTestStore(ctx context.Context, t *testing.T, connectParameters params.Co } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *Store) error { +func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { const query = ` DELETE FROM dss_metadata WHERE locality IS NOT NULL; ` diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 466e29acd..5662e3be1 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -1,11 +1,20 @@ package store import ( + "context" + "github.com/interuss/dss/pkg/aux_/repos" + auxdatastore "github.com/interuss/dss/pkg/aux_/store/datastore" dssstore "github.com/interuss/dss/pkg/store" + "go.uber.org/zap" ) // aux_.store.Store is a generic means to obtain an aux Repository (repo containing auxiliary // information not related to standardized services like RID or SCD specifically) to perform // aux-specific operations on any type of data backing the DSS may ever use. type Store = dssstore.Store[repos.Repository] + +// Init selects and initializes the aux store backend. +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { + return auxdatastore.Init(ctx, logger, withCheckCron) +} diff --git a/pkg/datastore/store.go b/pkg/datastore/store.go index ad5a100e2..33dcaf383 100644 --- a/pkg/datastore/store.go +++ b/pkg/datastore/store.go @@ -38,38 +38,13 @@ type Store[R any] struct { maxRetries int } -func NewStore[R any](ctx context.Context, db *Store[R], maxRetries int, crdbExpected int64, ybExpected int64, newRepo func(dsssql.Queryable) R) (Store[R], error) { - - dbName := db.Pool.Config().ConnConfig.Database - - vs, err := db.GetSchemaVersion(ctx, dbName) - if err != nil { - return Store[R]{}, stacktrace.Propagate(err, "Failed to get schema version for %s", dbName) - } - - if err := checkMajorSchemaVersion(ctx, db, vs, crdbExpected, ybExpected); err != nil { - return Store[R]{}, err - } - - return Store[R]{ - Pool: db.Pool, - Version: db.Version, - Clock: clockwork.NewRealClock(), - schemaVersion: vs, - newRepo: newRepo, - maxRetries: maxRetries, - }, nil -} - -func (s *Store[R]) Interact(_ context.Context) (R, error) { - return s.newRepo(s.Pool), nil -} - -func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { - ctx = crdb.WithMaxRetries(ctx, s.maxRetries) - return crdbpgx.ExecuteTx(ctx, s.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { - return f(ctx, s.newRepo(tx)) - }) +// Config describes everything a SQL-backed store needs to be initialized for a +// given specific package (rid, scd, aux, ...). +type Config[R any] struct { + DBName string + CrdbMajorSchemaVersion int64 + YbMajorSchemaVersion int64 + NewRepo func(q dsssql.Queryable, clock clockwork.Clock, v *Version) R } func checkMajorSchemaVersion[R any](ctx context.Context, db *Store[R], vs *semver.Version, crdbExpected int64, ybExpected int64) error { @@ -85,11 +60,6 @@ func checkMajorSchemaVersion[R any](ctx context.Context, db *Store[R], vs *semve return nil } -func (s *Store[R]) Close() error { - s.Pool.Close() - return nil -} - func checkDatabase[R any](ctx context.Context, db *Store[R], databaseName string) { logger := logging.WithValuesFromContext(ctx, logging.Logger) statsPtr := db.Pool.Stat() @@ -100,47 +70,6 @@ func checkDatabase[R any](ctx context.Context, db *Store[R], databaseName string } } -func DialStore[R any, S any](ctx context.Context, dbName string, withCheckCron bool, newStore func(*Store[R]) (S, error)) (S, error) { - - var zero S - - cp := params.GetConnectParameters() - cp.DBName = dbName - - db, err := Dial[R](ctx, cp) - - if err != nil { - if strings.Contains(err.Error(), "connect: connection refused") { - return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to datastore server for %s", dbName) - } - return zero, stacktrace.Propagate(err, "Failed to connect to %s database", dbName) - } - s, err := newStore(db) - if err != nil { - db.Pool.Close() - if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), fmt.Sprintf("database \"%s\" does not exist", dbName)) || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") { - return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to create %s store", dbName) - } - return zero, stacktrace.Propagate(err, "Failed to create %s store", dbName) - } - - if withCheckCron { - c := cron.New() - if _, err := c.AddFunc("@every 1m", func() { checkDatabase(ctx, db, dbName) }); err != nil { - db.Pool.Close() - return zero, stacktrace.Propagate(err, "Failed to schedule db check for %s", dbName) - } - c.Start() - - go func() { - <-ctx.Done() - c.Stop() - }() - } - - return s, nil -} - func Dial[R any](ctx context.Context, connParams params.ConnectParameters) (*Store[R], error) { dsn, err := connParams.BuildDSN() if err != nil { @@ -189,6 +118,70 @@ func Dial[R any](ctx context.Context, connParams params.ConnectParameters) (*Sto return nil, stacktrace.NewError("%s is not implemented yet", version.Type) } +// Init dials the database described by the global connect parameters (plus +// cfg.DBName), checks its schema version, and returns a ready-to-use Store[R]. +// If withCheckCron is true, a periodic health-check cron is started. +func Init[R any](ctx context.Context, cfg Config[R], withCheckCron bool) (*Store[R], error) { + cp := params.GetConnectParameters() + cp.DBName = cfg.DBName + + db, err := Dial[R](ctx, cp) + if err != nil { + if strings.Contains(err.Error(), "connect: connection refused") { + return nil, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to datastore server for %s", cfg.DBName) + } + return nil, stacktrace.Propagate(err, "Failed to connect to %s database", cfg.DBName) + } + + vs, err := db.GetSchemaVersion(ctx, cfg.DBName) + + if err == nil { + err = checkMajorSchemaVersion(ctx, db, vs, cfg.CrdbMajorSchemaVersion, cfg.YbMajorSchemaVersion) + } + if err != nil { + db.Pool.Close() + if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), fmt.Sprintf("database \"%s\" does not exist", cfg.DBName)) || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") { + return nil, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to create %s store", cfg.DBName) + } + return nil, stacktrace.Propagate(err, "Failed to create %s store", cfg.DBName) + } + + db.Clock = clockwork.NewRealClock() + db.schemaVersion = vs + db.maxRetries = cp.MaxRetries + db.newRepo = func(q dsssql.Queryable) R { + return cfg.NewRepo(q, db.Clock, db.Version) + } + + if withCheckCron { + c := cron.New() + if _, err := c.AddFunc("@every 1m", func() { checkDatabase(ctx, db, cfg.DBName) }); err != nil { + db.Pool.Close() + return nil, stacktrace.Propagate(err, "Failed to schedule db check for %s", cfg.DBName) + } + c.Start() + go func() { <-ctx.Done(); c.Stop() }() + } + + return db, nil +} + +func (s *Store[R]) Interact(_ context.Context) (R, error) { + return s.newRepo(s.Pool), nil +} + +func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { + ctx = crdb.WithMaxRetries(ctx, s.maxRetries) + return crdbpgx.ExecuteTx(ctx, s.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { + return f(ctx, s.newRepo(tx)) + }) +} + +func (s *Store[R]) Close() error { + s.Pool.Close() + return nil +} + func (s *Store[R]) CreateDatabase(ctx context.Context, dbName string) error { createDB := fmt.Sprintf("CREATE DATABASE %s", dbName) if _, err := s.Pool.Exec(ctx, createDB); err != nil { diff --git a/pkg/rid/application/application_test.go b/pkg/rid/application/application_test.go index 064558a3c..9829bfd3d 100644 --- a/pkg/rid/application/application_test.go +++ b/pkg/rid/application/application_test.go @@ -61,13 +61,10 @@ func setUpStore(ctx context.Context, t *testing.T, logger *zap.Logger) (store.St connectParameters.DBName = "rid" - ridDatastore, err := datastore.Dial[repos.Repository](ctx, connectParameters) + store, err := ridc.Init(ctx, logger, false) require.NoError(t, err) logger.Info("using datastore.") - store, err := ridc.NewStore(ctx, ridDatastore, logger) - require.NoError(t, err) - store.Clock = fakeClock return store, func() { @@ -77,7 +74,7 @@ func setUpStore(ctx context.Context, t *testing.T, logger *zap.Logger) (store.St } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *ridc.Store) error { +func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { const query = ` DELETE FROM subscriptions WHERE id IS NOT NULL; DELETE FROM identification_service_areas WHERE id IS NOT NULL;` diff --git a/pkg/rid/store/datastore/store.go b/pkg/rid/store/datastore/store.go index 331375445..40a6ed0ea 100644 --- a/pkg/rid/store/datastore/store.go +++ b/pkg/rid/store/datastore/store.go @@ -3,7 +3,6 @@ package datastore import ( "context" - "github.com/interuss/dss/pkg/datastore/params" dssql "github.com/interuss/dss/pkg/sql" "github.com/interuss/dss/pkg/datastore" @@ -27,35 +26,19 @@ type repo struct { logger *zap.Logger } -// rid.store.datastore.Store is a a full implementation of store.Store[rid.repos.Repository] -// for data backings that use a database such as CockroachDB or YugabyteDB. -type Store struct { - datastore.Store[repos.Repository] -} - -func NewStore(ctx context.Context, db *datastore.Store[repos.Repository], logger *zap.Logger) (*Store, error) { - - s := &Store{} - - base, err := datastore.NewStore(ctx, db, params.GetConnectParameters().MaxRetries, currentCrdbMajorSchemaVersion, currentYugabyteMajorSchemaVersion, func(q dssql.Queryable) repos.Repository { - return &repo{ - Queryable: q, - clock: s.Clock, - logger: logging.WithValuesFromContext(ctx, logger), - } - }) - if err != nil { - return nil, err - } - s.Store = base - return s, nil -} - -func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*Store, error) { - - store, err := datastore.DialStore(ctx, "rid", withCheckCron, func(db *datastore.Store[repos.Repository]) (*Store, error) { - return NewStore(ctx, db, logger) - }) - - return store, err +// Init initializes the SQL-backed rid store. It return a concrete datastore.Store[rid.repos.Repository] providing the +// ability to interact with a database-backed store of rid information. +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*datastore.Store[repos.Repository], error) { + return datastore.Init(ctx, datastore.Config[repos.Repository]{ + DBName: "rid", + CrdbMajorSchemaVersion: currentCrdbMajorSchemaVersion, + YbMajorSchemaVersion: currentYugabyteMajorSchemaVersion, + NewRepo: func(q dssql.Queryable, clock clockwork.Clock, _ *datastore.Version) repos.Repository { + return &repo{ + Queryable: q, + clock: clock, + logger: logging.WithValuesFromContext(ctx, logger), + } + }, + }, withCheckCron) } diff --git a/pkg/rid/store/datastore/store_test.go b/pkg/rid/store/datastore/store_test.go index 3566cc0ac..d6531c0b5 100644 --- a/pkg/rid/store/datastore/store_test.go +++ b/pkg/rid/store/datastore/store_test.go @@ -25,7 +25,7 @@ var ( writer = "writer" ) -func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { +func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repository], func()) { connectParameters := params.GetConnectParameters() if connectParameters.Host == "" || connectParameters.Port == 0 { t.Skip() @@ -42,11 +42,9 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { } } -func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) { - db, err := datastore.Dial[repos.Repository](ctx, connectParameters) - require.NoError(t, err) +func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*datastore.Store[repos.Repository], error) { + s, err := Init(ctx, logging.Logger, false) - s, err := NewStore(ctx, db, logging.Logger) if err != nil { return nil, err } @@ -56,7 +54,7 @@ func newTestStore(ctx context.Context, t *testing.T, connectParameters params.Co } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *Store) error { +func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { const query = ` DELETE FROM subscriptions WHERE id IS NOT NULL; DELETE FROM identification_service_areas WHERE id IS NOT NULL;` diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index b2d1e9e5b..46bbdea14 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -1,10 +1,19 @@ package store import ( + "context" + "github.com/interuss/dss/pkg/rid/repos" + riddatastore "github.com/interuss/dss/pkg/rid/store/datastore" dssstore "github.com/interuss/dss/pkg/store" + "go.uber.org/zap" ) // rid.store.Store is a generic means to obtain an RID rid.repos.Repository to perform RID-specific // operations on any type of data backing the DSS may ever use. type Store = dssstore.Store[repos.Repository] + +// Init selects and initializes the rid store backend. +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { + return riddatastore.Init(ctx, logger, withCheckCron) +} diff --git a/pkg/scd/store/datastore/store.go b/pkg/scd/store/datastore/store.go index 20ea5a794..3b48fd3ba 100644 --- a/pkg/scd/store/datastore/store.go +++ b/pkg/scd/store/datastore/store.go @@ -3,7 +3,6 @@ package datastore import ( "context" - "github.com/interuss/dss/pkg/datastore/params" dssql "github.com/interuss/dss/pkg/sql" "github.com/interuss/dss/pkg/datastore" @@ -28,36 +27,20 @@ type repo struct { globalLock bool } -// scd.store.datastore.Store is a a full implementation of store.Store[scd.repos.Repository] -// for data backings that use a database such as CockroachDB or YugabyteDB. -type Store struct { - datastore.Store[repos.Repository] -} - -func newStore(ctx context.Context, db *datastore.Store[repos.Repository], logger *zap.Logger, globalLock bool) (*Store, error) { - - s := &Store{} - - base, err := datastore.NewStore(ctx, db, params.GetConnectParameters().MaxRetries, currentCrdbMajorSchemaVersion, currentYugabyteMajorSchemaVersion, func(q dssql.Queryable) repos.Repository { - return &repo{ - q: q, - clock: s.Clock, - logger: logging.WithValuesFromContext(ctx, logger), - globalLock: globalLock, - } - }) - if err != nil { - return nil, err - } - s.Store = base - return s, nil -} - -func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (*Store, error) { - - store, err := datastore.DialStore(ctx, "scd", withCheckCron, func(db *datastore.Store[repos.Repository]) (*Store, error) { - return newStore(ctx, db, logger, globalLock) - }) - - return store, err +// Init initializes the SQL-backed sid store. It return a concrete datastore.Store[sid.repos.Repository] providing the +// ability to interact with a database-backed store of sid information. +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (*datastore.Store[repos.Repository], error) { + return datastore.Init(ctx, datastore.Config[repos.Repository]{ + DBName: "scd", + CrdbMajorSchemaVersion: currentCrdbMajorSchemaVersion, + YbMajorSchemaVersion: currentYugabyteMajorSchemaVersion, + NewRepo: func(q dssql.Queryable, clock clockwork.Clock, _ *datastore.Version) repos.Repository { + return &repo{ + q: q, + clock: clock, + logger: logging.WithValuesFromContext(ctx, logger), + globalLock: globalLock, + } + }, + }, withCheckCron) } diff --git a/pkg/scd/store/datastore/store_test.go b/pkg/scd/store/datastore/store_test.go index 850a0c1fa..f86218a7b 100644 --- a/pkg/scd/store/datastore/store_test.go +++ b/pkg/scd/store/datastore/store_test.go @@ -16,7 +16,7 @@ var ( fakeClock = clockwork.NewFakeClock() ) -func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { +func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repository], func()) { connectParameters := params.GetConnectParameters() if connectParameters.Host == "" || connectParameters.Port == 0 { t.Skip() @@ -33,11 +33,9 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { } } -func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) { - db, err := datastore.Dial[repos.Repository](ctx, connectParameters) - require.NoError(t, err) +func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*datastore.Store[repos.Repository], error) { + s, err := Init(ctx, logging.Logger, false, false) - s, err := newStore(ctx, db, logging.Logger, false) if err != nil { return nil, err } @@ -47,7 +45,7 @@ func newTestStore(ctx context.Context, t *testing.T, connectParameters params.Co } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *Store) error { +func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { const query = ` DELETE FROM scd_subscriptions WHERE id IS NOT NULL; DELETE FROM scd_operations WHERE id IS NOT NULL; diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index 9d6c9fa68..0ccffecb6 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -1,10 +1,19 @@ package store import ( + "context" + "github.com/interuss/dss/pkg/scd/repos" + scddatastore "github.com/interuss/dss/pkg/scd/store/datastore" dssstore "github.com/interuss/dss/pkg/store" + "go.uber.org/zap" ) // scd.store.Store is a generic means to obtain an SCD scd.repos.Repository to perform SCD-specific // operations on any type of data backing the DSS may ever use. type Store = dssstore.Store[repos.Repository] + +// Init selects and initializes the scd store backend. +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (Store, error) { + return scddatastore.Init(ctx, logger, withCheckCron, globalLock) +} From ad9fe57c9e715f272d69d6ecf2815f7d83d73010 Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Wed, 22 Apr 2026 14:32:46 +0200 Subject: [PATCH 3/6] [store] Rename datastore to sqlstore --- Makefile | 6 +++--- build/db_schemas/README.md | 2 +- build/db_schemas/update_latest_version.sh | 14 +++++++------- cmds/core-service/README.md | 2 +- cmds/core-service/main.go | 4 ++-- cmds/db-manager/migration/migrate.go | 12 ++++++------ pkg/aux_/accepted_ca_certs.go | 2 +- pkg/aux_/instance_ca_certs.go | 2 +- pkg/aux_/store/{datastore => sqlstore}/doc.go | 4 ++-- pkg/aux_/store/{datastore => sqlstore}/dss.go | 12 ++++++------ pkg/aux_/store/{datastore => sqlstore}/store.go | 16 ++++++++-------- .../store/{datastore => sqlstore}/store_test.go | 12 ++++++------ pkg/aux_/store/store.go | 4 ++-- pkg/datastore/doc.go | 3 --- pkg/rid/application/application_test.go | 10 +++++----- pkg/rid/store/datastore/doc.go | 3 --- pkg/rid/store/sqlstore/doc.go | 3 +++ .../identification_service_area.go | 2 +- .../identification_service_area_test.go | 2 +- pkg/rid/store/{datastore => sqlstore}/store.go | 16 ++++++++-------- .../store/{datastore => sqlstore}/store_test.go | 12 ++++++------ .../{datastore => sqlstore}/subscriptions.go | 2 +- .../subscriptions_test.go | 2 +- pkg/rid/store/store.go | 4 ++-- pkg/scd/store/datastore/doc.go | 3 --- .../{datastore => sqlstore}/availability.go | 2 +- .../store/{datastore => sqlstore}/constraints.go | 2 +- pkg/scd/store/sqlstore/doc.go | 3 +++ .../operational_intents.go | 2 +- .../operational_intents_test.go | 2 +- pkg/scd/store/{datastore => sqlstore}/store.go | 16 ++++++++-------- .../store/{datastore => sqlstore}/store_test.go | 12 ++++++------ .../{datastore => sqlstore}/subscriptions.go | 2 +- .../subscriptions_test.go | 2 +- pkg/scd/store/store.go | 4 ++-- pkg/sqlstore/doc.go | 3 +++ pkg/{datastore => sqlstore}/params/params.go | 0 .../params/params_test.go | 0 pkg/{datastore => sqlstore}/store.go | 10 +++++----- pkg/{datastore => sqlstore}/version.go | 5 +++-- 40 files changed, 110 insertions(+), 109 deletions(-) rename pkg/aux_/store/{datastore => sqlstore}/doc.go (55%) rename pkg/aux_/store/{datastore => sqlstore}/dss.go (96%) rename pkg/aux_/store/{datastore => sqlstore}/store.go (72%) rename pkg/aux_/store/{datastore => sqlstore}/store_test.go (74%) delete mode 100644 pkg/datastore/doc.go delete mode 100644 pkg/rid/store/datastore/doc.go create mode 100644 pkg/rid/store/sqlstore/doc.go rename pkg/rid/store/{datastore => sqlstore}/identification_service_area.go (99%) rename pkg/rid/store/{datastore => sqlstore}/identification_service_area_test.go (99%) rename pkg/rid/store/{datastore => sqlstore}/store.go (65%) rename pkg/rid/store/{datastore => sqlstore}/store_test.go (94%) rename pkg/rid/store/{datastore => sqlstore}/subscriptions.go (99%) rename pkg/rid/store/{datastore => sqlstore}/subscriptions_test.go (99%) delete mode 100644 pkg/scd/store/datastore/doc.go rename pkg/scd/store/{datastore => sqlstore}/availability.go (99%) rename pkg/scd/store/{datastore => sqlstore}/constraints.go (99%) create mode 100644 pkg/scd/store/sqlstore/doc.go rename pkg/scd/store/{datastore => sqlstore}/operational_intents.go (99%) rename pkg/scd/store/{datastore => sqlstore}/operational_intents_test.go (99%) rename pkg/scd/store/{datastore => sqlstore}/store.go (66%) rename pkg/scd/store/{datastore => sqlstore}/store_test.go (76%) rename pkg/scd/store/{datastore => sqlstore}/subscriptions.go (99%) rename pkg/scd/store/{datastore => sqlstore}/subscriptions_test.go (99%) create mode 100644 pkg/sqlstore/doc.go rename pkg/{datastore => sqlstore}/params/params.go (100%) rename pkg/{datastore => sqlstore}/params/params_test.go (100%) rename pkg/{datastore => sqlstore}/store.go (96%) rename pkg/{datastore => sqlstore}/version.go (98%) diff --git a/Makefile b/Makefile index c763d158a..f9b95ef68 100644 --- a/Makefile +++ b/Makefile @@ -156,10 +156,10 @@ test-go-units-crdb: cleanup-test-go-units-crdb go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/rid --db_version latest --datastore_host localhost go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/scd --db_version latest --datastore_host localhost go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/aux_ --db_version latest --datastore_host localhost - go test -cover -count=1 -v ./pkg/rid/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) + go test -cover -count=1 -v ./pkg/rid/store/sqlstore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) go test -cover -count=1 -v ./pkg/rid/application --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) - go test -cover -count=1 -v ./pkg/scd/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) - go test -cover -count=1 -v ./pkg/aux_/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) + go test -cover -count=1 -v ./pkg/scd/store/sqlstore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) + go test -cover -count=1 -v ./pkg/aux_/store/sqlstore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR) @docker stop dss-crdb-for-testing > /dev/null @docker rm dss-crdb-for-testing > /dev/null diff --git a/build/db_schemas/README.md b/build/db_schemas/README.md index 8b582f17f..3c07b6da0 100644 --- a/build/db_schemas/README.md +++ b/build/db_schemas/README.md @@ -21,7 +21,7 @@ places: * [DSS main.jsonnet](../../deploy/services/tanka/examples/minimum/main.jsonnet) * [Schema manager main.jsonnet](../../deploy/services/tanka/examples/schema_manager/main.jsonnet) * [Minikube main.jsonnet](../../deploy/services/tanka/examples/minikube/main.jsonnet) -* /pkg/{rid|scd|aux_}/store/datastore/store.go +* /pkg/{rid|scd|aux_}/store/sqlstore/store.go * /deploy/infrastructure/dependencies/terraform-commons-dss/default_latest.tf * /deploy/services/helm-charts/dss/templates/schema-manager.yaml diff --git a/build/db_schemas/update_latest_version.sh b/build/db_schemas/update_latest_version.sh index 2a0dfe37d..2d3886875 100755 --- a/build/db_schemas/update_latest_version.sh +++ b/build/db_schemas/update_latest_version.sh @@ -73,12 +73,12 @@ YBDB_RID_MAJOR=$(echo "$YBDB_RID" | cut -d. -f1) YBDB_SCD_MAJOR=$(echo "$YBDB_SCD" | cut -d. -f1) AUX_MAJOR=$(echo "$AUX" | cut -d. -f1) -# Replace major versions in datastore files -sed -i -E "s/(currentCrdbMajorSchemaVersion.*= )[0-9]/\1$CRDB_SCD_MAJOR/" "${BASEDIR}/pkg/scd/store/datastore/store.go" -sed -i -E "s/(currentYugabyteMajorSchemaVersion.*= )[0-9]/\1$YBDB_SCD_MAJOR/" "${BASEDIR}/pkg/scd/store/datastore/store.go" +# Replace major versions in sqlstore files +sed -i -E "s/(currentCrdbMajorSchemaVersion.*= )[0-9]/\1$CRDB_SCD_MAJOR/" "${BASEDIR}/pkg/scd/store/sqlstore/store.go" +sed -i -E "s/(currentYugabyteMajorSchemaVersion.*= )[0-9]/\1$YBDB_SCD_MAJOR/" "${BASEDIR}/pkg/scd/store/sqlstore/store.go" -sed -i -E "s/(currentCrdbMajorSchemaVersion.*= )[0-9]/\1$CRDB_RID_MAJOR/" "${BASEDIR}/pkg/rid/store/datastore/store.go" -sed -i -E "s/(currentYugabyteMajorSchemaVersion.*= )[0-9]/\1$YBDB_RID_MAJOR/" "${BASEDIR}/pkg/rid/store/datastore/store.go" +sed -i -E "s/(currentCrdbMajorSchemaVersion.*= )[0-9]/\1$CRDB_RID_MAJOR/" "${BASEDIR}/pkg/rid/store/sqlstore/store.go" +sed -i -E "s/(currentYugabyteMajorSchemaVersion.*= )[0-9]/\1$YBDB_RID_MAJOR/" "${BASEDIR}/pkg/rid/store/sqlstore/store.go" -sed -i -E "s/(currentCrdbMajorSchemaVersion.*= )[0-9]/\1$AUX_MAJOR/" "${BASEDIR}/pkg/aux_/store/datastore/store.go" -sed -i -E "s/(currentYugabyteMajorSchemaVersion.*= )[0-9]/\1$AUX_MAJOR/" "${BASEDIR}/pkg/aux_/store/datastore/store.go" +sed -i -E "s/(currentCrdbMajorSchemaVersion.*= )[0-9]/\1$AUX_MAJOR/" "${BASEDIR}/pkg/aux_/store/sqlstore/store.go" +sed -i -E "s/(currentYugabyteMajorSchemaVersion.*= )[0-9]/\1$AUX_MAJOR/" "${BASEDIR}/pkg/aux_/store/sqlstore/store.go" diff --git a/cmds/core-service/README.md b/cmds/core-service/README.md index 7c17d9970..184b7b4df 100644 --- a/cmds/core-service/README.md +++ b/cmds/core-service/README.md @@ -29,7 +29,7 @@ go run ./cmds/core-service \ #### CockroachDB cluster -To run correctly, core-service must be able to [access](../../pkg/datastore/params/params.go) a CockroachDB or a Yugabyte cluster. Provision of this cluster is handled automatically for a local development environment if following [the instructions for a standalone instance](../../build/dev/standalone_instance.md). +To run correctly, core-service must be able to [access](../../pkg/sqlstore/params/params.go) a CockroachDB or a Yugabyte cluster. Provision of this cluster is handled automatically for a local development environment if following [the instructions for a standalone instance](../../build/dev/standalone_instance.md). Alternatively, a CockroachDB instance can be created manually with: diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index fa5f83e95..212d6ddbb 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -23,7 +23,6 @@ import ( aux "github.com/interuss/dss/pkg/aux_" auxs "github.com/interuss/dss/pkg/aux_/store" "github.com/interuss/dss/pkg/build" - "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/rid/application" rid_v1 "github.com/interuss/dss/pkg/rid/server/v1" @@ -31,6 +30,7 @@ import ( rids "github.com/interuss/dss/pkg/rid/store" "github.com/interuss/dss/pkg/scd" scds "github.com/interuss/dss/pkg/scd/store" + "github.com/interuss/dss/pkg/sqlstore" "github.com/interuss/dss/pkg/version" "github.com/interuss/dss/pkg/versioning" "github.com/interuss/stacktrace" @@ -350,7 +350,7 @@ func main() { backoff := 0 for { if err := RunHTTPServer(ctx, cancel, *address, *locality); err != nil { - if stacktrace.GetCode(err) == datastore.CodeRetryable { + if stacktrace.GetCode(err) == sqlstore.CodeRetryable { logger.Info(fmt.Sprintf("Prerequisites not yet satisfied; waiting %.fs to retry...", backoffs[backoff].Seconds()), zap.Error(err)) time.Sleep(backoffs[backoff]) if backoff < len(backoffs)-1 { diff --git a/cmds/db-manager/migration/migrate.go b/cmds/db-manager/migration/migrate.go index 11c6acc1c..503ad8785 100644 --- a/cmds/db-manager/migration/migrate.go +++ b/cmds/db-manager/migration/migrate.go @@ -11,8 +11,8 @@ import ( "time" "github.com/coreos/go-semver/semver" - "github.com/interuss/dss/pkg/datastore" - "github.com/interuss/dss/pkg/datastore/params" + "github.com/interuss/dss/pkg/sqlstore" + "github.com/interuss/dss/pkg/sqlstore/params" "github.com/interuss/stacktrace" "github.com/spf13/cobra" @@ -97,8 +97,8 @@ func migrate(cmd *cobra.Command, _ []string) error { log.Printf("Datastore server type and version: %s@%s", ds.Version.Type, ds.Version.SemVer.String()) var ( - isCockroach = ds.Version.Type == datastore.CockroachDB - isYugabyte = ds.Version.Type == datastore.Yugabyte + isCockroach = ds.Version.Type == sqlstore.CockroachDB + isYugabyte = ds.Version.Type == sqlstore.Yugabyte ) // Make sure specified database exists @@ -225,11 +225,11 @@ func migrate(cmd *cobra.Command, _ []string) error { return nil } -func connectTo(ctx context.Context, dbName string) (*datastore.Store[any], error) { +func connectTo(ctx context.Context, dbName string) (*sqlstore.Store[any], error) { // Connect to database server connectParameters := params.GetConnectParameters() connectParameters.DBName = dbName - return datastore.Dial[any](ctx, connectParameters) + return sqlstore.Dial[any](ctx, connectParameters) } func enumerateMigrationSteps(path *string) ([]MigrationStep, error) { diff --git a/pkg/aux_/accepted_ca_certs.go b/pkg/aux_/accepted_ca_certs.go index eb082ef12..b35a56d39 100644 --- a/pkg/aux_/accepted_ca_certs.go +++ b/pkg/aux_/accepted_ca_certs.go @@ -7,7 +7,7 @@ import ( "strings" restapi "github.com/interuss/dss/pkg/api/auxv1" - "github.com/interuss/dss/pkg/datastore/params" + "github.com/interuss/dss/pkg/sqlstore/params" ) func (a *Server) GetAcceptedCAs(ctx context.Context, req *restapi.GetAcceptedCAsRequest) restapi.GetAcceptedCAsResponseSet { diff --git a/pkg/aux_/instance_ca_certs.go b/pkg/aux_/instance_ca_certs.go index 31294d242..ae4ef9192 100644 --- a/pkg/aux_/instance_ca_certs.go +++ b/pkg/aux_/instance_ca_certs.go @@ -7,7 +7,7 @@ import ( "strings" restapi "github.com/interuss/dss/pkg/api/auxv1" - "github.com/interuss/dss/pkg/datastore/params" + "github.com/interuss/dss/pkg/sqlstore/params" ) func (a *Server) GetInstanceCAs(ctx context.Context, req *restapi.GetInstanceCAsRequest) restapi.GetInstanceCAsResponseSet { diff --git a/pkg/aux_/store/datastore/doc.go b/pkg/aux_/store/sqlstore/doc.go similarity index 55% rename from pkg/aux_/store/datastore/doc.go rename to pkg/aux_/store/sqlstore/doc.go index 8cfdd63f2..c1d46cd04 100644 --- a/pkg/aux_/store/datastore/doc.go +++ b/pkg/aux_/store/sqlstore/doc.go @@ -1,4 +1,4 @@ -// Package aux_.store.datastore provides a concrete implementation of a +// Package aux_.store.sqlstore provides a concrete implementation of a // store.Store[aux_.repos.Repository] using a database as a data backing (such as CockroachDB or // YugabyteDB). -package datastore +package sqlstore diff --git a/pkg/aux_/store/datastore/dss.go b/pkg/aux_/store/sqlstore/dss.go similarity index 96% rename from pkg/aux_/store/datastore/dss.go rename to pkg/aux_/store/sqlstore/dss.go index e5b22c52e..eb81196ac 100644 --- a/pkg/aux_/store/datastore/dss.go +++ b/pkg/aux_/store/sqlstore/dss.go @@ -1,12 +1,12 @@ -package datastore +package sqlstore import ( "context" "time" auxmodels "github.com/interuss/dss/pkg/aux_/models" - "github.com/interuss/dss/pkg/datastore" dsserr "github.com/interuss/dss/pkg/errors" + "github.com/interuss/dss/pkg/sqlstore" "github.com/interuss/stacktrace" ) @@ -145,16 +145,16 @@ func (r *repo) RecordHeartbeat(ctx context.Context, heartbeat auxmodels.Heartbea } -// GetDSSAirspaceRepresentationID gets the ID of the common DSS Airspace Representation the Datastore represents. +// GetDSSAirspaceRepresentationID gets the ID of the common DSS Airspace Representation the sqlstore represents. func (r *repo) GetDSSAirspaceRepresentationID(ctx context.Context) (string, error) { switch r.version.Type { - case datastore.CockroachDB: + case sqlstore.CockroachDB: var darID string if err := r.QueryRow(ctx, "SELECT crdb_internal.cluster_id()").Scan(&darID); err != nil { return darID, stacktrace.Propagate(err, "Error getting CockroachDB cluster ID") } return darID, nil - case datastore.Yugabyte: + case sqlstore.Yugabyte: var darID string @@ -173,6 +173,6 @@ func (r *repo) GetDSSAirspaceRepresentationID(ctx context.Context) (string, erro return darID, nil default: - return "", stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDSSAirspaceRepresentationID is not yet supported in current Datastore type '%s'", r.version.Type) + return "", stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDSSAirspaceRepresentationID is not yet supported in current sqlstore type '%s'", r.version.Type) } } diff --git a/pkg/aux_/store/datastore/store.go b/pkg/aux_/store/sqlstore/store.go similarity index 72% rename from pkg/aux_/store/datastore/store.go rename to pkg/aux_/store/sqlstore/store.go index 17eb07480..11c5f5e7a 100644 --- a/pkg/aux_/store/datastore/store.go +++ b/pkg/aux_/store/sqlstore/store.go @@ -1,18 +1,18 @@ -package datastore +package sqlstore import ( "context" "github.com/interuss/dss/pkg/aux_/repos" - "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/logging" dssql "github.com/interuss/dss/pkg/sql" + "github.com/interuss/dss/pkg/sqlstore" "github.com/jonboulle/clockwork" "go.uber.org/zap" ) const ( - // The current major schema version per datastore type. + // The current major schema version per sqlstore type. currentCrdbMajorSchemaVersion = 1 currentYugabyteMajorSchemaVersion = 1 ) @@ -21,17 +21,17 @@ type repo struct { dssql.Queryable clock clockwork.Clock logger *zap.Logger - version *datastore.Version + version *sqlstore.Version } -// Init initializes the SQL-backed rid store. It return a concrete datastore.Store[aux_.repos.Repository] providing the +// Init initializes the SQL-backed rid store. It return a concrete sqlstore.Store[aux_.repos.Repository] providing the // ability to interact with a database-backed store of aux information. -func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*datastore.Store[repos.Repository], error) { - return datastore.Init(ctx, datastore.Config[repos.Repository]{ +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*sqlstore.Store[repos.Repository], error) { + return sqlstore.Init(ctx, sqlstore.Config[repos.Repository]{ DBName: "aux", CrdbMajorSchemaVersion: currentCrdbMajorSchemaVersion, YbMajorSchemaVersion: currentYugabyteMajorSchemaVersion, - NewRepo: func(q dssql.Queryable, clock clockwork.Clock, version *datastore.Version) repos.Repository { + NewRepo: func(q dssql.Queryable, clock clockwork.Clock, version *sqlstore.Version) repos.Repository { return &repo{ Queryable: q, clock: clock, diff --git a/pkg/aux_/store/datastore/store_test.go b/pkg/aux_/store/sqlstore/store_test.go similarity index 74% rename from pkg/aux_/store/datastore/store_test.go rename to pkg/aux_/store/sqlstore/store_test.go index 62513a3f4..8519d4537 100644 --- a/pkg/aux_/store/datastore/store_test.go +++ b/pkg/aux_/store/sqlstore/store_test.go @@ -1,13 +1,13 @@ -package datastore +package sqlstore import ( "context" "testing" "github.com/interuss/dss/pkg/aux_/repos" - "github.com/interuss/dss/pkg/datastore" - "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" + "github.com/interuss/dss/pkg/sqlstore" + "github.com/interuss/dss/pkg/sqlstore/params" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -16,7 +16,7 @@ var ( fakeClock = clockwork.NewFakeClock() ) -func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repository], func()) { +func setUpStore(ctx context.Context, t *testing.T) (*sqlstore.Store[repos.Repository], func()) { connectParameters := params.GetConnectParameters() if connectParameters.Host == "" || connectParameters.Port == 0 { t.Skip() @@ -33,7 +33,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repos } } -func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*datastore.Store[repos.Repository], error) { +func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*sqlstore.Store[repos.Repository], error) { s, err := Init(ctx, logging.Logger, false) if err != nil { @@ -45,7 +45,7 @@ func newTestStore(ctx context.Context, t *testing.T, connectParameters params.Co } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { +func cleanUp(ctx context.Context, s *sqlstore.Store[repos.Repository]) error { const query = ` DELETE FROM dss_metadata WHERE locality IS NOT NULL; ` diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 5662e3be1..481fb62e1 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -4,7 +4,7 @@ import ( "context" "github.com/interuss/dss/pkg/aux_/repos" - auxdatastore "github.com/interuss/dss/pkg/aux_/store/datastore" + auxsqlstore "github.com/interuss/dss/pkg/aux_/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" "go.uber.org/zap" ) @@ -16,5 +16,5 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the aux store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { - return auxdatastore.Init(ctx, logger, withCheckCron) + return auxsqlstore.Init(ctx, logger, withCheckCron) } diff --git a/pkg/datastore/doc.go b/pkg/datastore/doc.go deleted file mode 100644 index 5376ee02b..000000000 --- a/pkg/datastore/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package datastore bundles up types and functions common to data backings that use a database -// (such as CRDB or Yugabyte). -package datastore diff --git a/pkg/rid/application/application_test.go b/pkg/rid/application/application_test.go index 9829bfd3d..0cc3fd885 100644 --- a/pkg/rid/application/application_test.go +++ b/pkg/rid/application/application_test.go @@ -5,14 +5,14 @@ import ( "testing" "time" - "github.com/interuss/dss/pkg/datastore" - "github.com/interuss/dss/pkg/datastore/params" dssmodels "github.com/interuss/dss/pkg/models" ridmodels "github.com/interuss/dss/pkg/rid/models" "github.com/interuss/dss/pkg/rid/repos" "github.com/interuss/dss/pkg/rid/store" - ridc "github.com/interuss/dss/pkg/rid/store/datastore" + ridc "github.com/interuss/dss/pkg/rid/store/sqlstore" dssql "github.com/interuss/dss/pkg/sql" + "github.com/interuss/dss/pkg/sqlstore" + "github.com/interuss/dss/pkg/sqlstore/params" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" @@ -63,7 +63,7 @@ func setUpStore(ctx context.Context, t *testing.T, logger *zap.Logger) (store.St store, err := ridc.Init(ctx, logger, false) require.NoError(t, err) - logger.Info("using datastore.") + logger.Info("using sqlstore.") store.Clock = fakeClock @@ -74,7 +74,7 @@ func setUpStore(ctx context.Context, t *testing.T, logger *zap.Logger) (store.St } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { +func cleanUp(ctx context.Context, s *sqlstore.Store[repos.Repository]) error { const query = ` DELETE FROM subscriptions WHERE id IS NOT NULL; DELETE FROM identification_service_areas WHERE id IS NOT NULL;` diff --git a/pkg/rid/store/datastore/doc.go b/pkg/rid/store/datastore/doc.go deleted file mode 100644 index 7995bc1d3..000000000 --- a/pkg/rid/store/datastore/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package rid.store.datastore provides a full implementation of store.Store[rid.repos.Repository] -// for data backings that use a database such as CockroachDB or YugabyteDB. -package datastore diff --git a/pkg/rid/store/sqlstore/doc.go b/pkg/rid/store/sqlstore/doc.go new file mode 100644 index 000000000..5c59b62c7 --- /dev/null +++ b/pkg/rid/store/sqlstore/doc.go @@ -0,0 +1,3 @@ +// Package rid.store.sqlstore provides a full implementation of store.Store[rid.repos.Repository] +// for data backings that use a database such as CockroachDB or YugabyteDB. +package sqlstore diff --git a/pkg/rid/store/datastore/identification_service_area.go b/pkg/rid/store/sqlstore/identification_service_area.go similarity index 99% rename from pkg/rid/store/datastore/identification_service_area.go rename to pkg/rid/store/sqlstore/identification_service_area.go index ad55aa904..cf502843c 100644 --- a/pkg/rid/store/datastore/identification_service_area.go +++ b/pkg/rid/store/sqlstore/identification_service_area.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/rid/store/datastore/identification_service_area_test.go b/pkg/rid/store/sqlstore/identification_service_area_test.go similarity index 99% rename from pkg/rid/store/datastore/identification_service_area_test.go rename to pkg/rid/store/sqlstore/identification_service_area_test.go index 863fd2e55..0eb329977 100644 --- a/pkg/rid/store/datastore/identification_service_area_test.go +++ b/pkg/rid/store/sqlstore/identification_service_area_test.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/rid/store/datastore/store.go b/pkg/rid/store/sqlstore/store.go similarity index 65% rename from pkg/rid/store/datastore/store.go rename to pkg/rid/store/sqlstore/store.go index 40a6ed0ea..6211ef3ee 100644 --- a/pkg/rid/store/datastore/store.go +++ b/pkg/rid/store/sqlstore/store.go @@ -1,24 +1,24 @@ -package datastore +package sqlstore import ( "context" dssql "github.com/interuss/dss/pkg/sql" - "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/rid/repos" + "github.com/interuss/dss/pkg/sqlstore" "github.com/jonboulle/clockwork" "go.uber.org/zap" ) const ( - // The current major schema version per datastore type. + // The current major schema version per sqlstore type. currentCrdbMajorSchemaVersion = 4 currentYugabyteMajorSchemaVersion = 1 ) -// rid.store.datastore.repo is a full implementation of rid.repos.Repository for data backings that +// rid.store.sqlstore.repo is a full implementation of rid.repos.Repository for data backings that // use a database such as CockroachDB or YugabyteDB. type repo struct { dssql.Queryable @@ -26,14 +26,14 @@ type repo struct { logger *zap.Logger } -// Init initializes the SQL-backed rid store. It return a concrete datastore.Store[rid.repos.Repository] providing the +// Init initializes the SQL-backed rid store. It return a concrete sqlstore.Store[rid.repos.Repository] providing the // ability to interact with a database-backed store of rid information. -func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*datastore.Store[repos.Repository], error) { - return datastore.Init(ctx, datastore.Config[repos.Repository]{ +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*sqlstore.Store[repos.Repository], error) { + return sqlstore.Init(ctx, sqlstore.Config[repos.Repository]{ DBName: "rid", CrdbMajorSchemaVersion: currentCrdbMajorSchemaVersion, YbMajorSchemaVersion: currentYugabyteMajorSchemaVersion, - NewRepo: func(q dssql.Queryable, clock clockwork.Clock, _ *datastore.Version) repos.Repository { + NewRepo: func(q dssql.Queryable, clock clockwork.Clock, _ *sqlstore.Version) repos.Repository { return &repo{ Queryable: q, clock: clock, diff --git a/pkg/rid/store/datastore/store_test.go b/pkg/rid/store/sqlstore/store_test.go similarity index 94% rename from pkg/rid/store/datastore/store_test.go rename to pkg/rid/store/sqlstore/store_test.go index d6531c0b5..275a1a332 100644 --- a/pkg/rid/store/datastore/store_test.go +++ b/pkg/rid/store/sqlstore/store_test.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" @@ -7,12 +7,12 @@ import ( "time" "github.com/google/uuid" - "github.com/interuss/dss/pkg/datastore" - "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" dssmodels "github.com/interuss/dss/pkg/models" ridmodels "github.com/interuss/dss/pkg/rid/models" "github.com/interuss/dss/pkg/rid/repos" + "github.com/interuss/dss/pkg/sqlstore" + "github.com/interuss/dss/pkg/sqlstore/params" "github.com/jackc/pgx/v5/pgconn" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" @@ -25,7 +25,7 @@ var ( writer = "writer" ) -func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repository], func()) { +func setUpStore(ctx context.Context, t *testing.T) (*sqlstore.Store[repos.Repository], func()) { connectParameters := params.GetConnectParameters() if connectParameters.Host == "" || connectParameters.Port == 0 { t.Skip() @@ -42,7 +42,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repos } } -func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*datastore.Store[repos.Repository], error) { +func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*sqlstore.Store[repos.Repository], error) { s, err := Init(ctx, logging.Logger, false) if err != nil { @@ -54,7 +54,7 @@ func newTestStore(ctx context.Context, t *testing.T, connectParameters params.Co } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { +func cleanUp(ctx context.Context, s *sqlstore.Store[repos.Repository]) error { const query = ` DELETE FROM subscriptions WHERE id IS NOT NULL; DELETE FROM identification_service_areas WHERE id IS NOT NULL;` diff --git a/pkg/rid/store/datastore/subscriptions.go b/pkg/rid/store/sqlstore/subscriptions.go similarity index 99% rename from pkg/rid/store/datastore/subscriptions.go rename to pkg/rid/store/sqlstore/subscriptions.go index 0b3dd6e3e..ae4429fa8 100644 --- a/pkg/rid/store/datastore/subscriptions.go +++ b/pkg/rid/store/sqlstore/subscriptions.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/rid/store/datastore/subscriptions_test.go b/pkg/rid/store/sqlstore/subscriptions_test.go similarity index 99% rename from pkg/rid/store/datastore/subscriptions_test.go rename to pkg/rid/store/sqlstore/subscriptions_test.go index 0d3c6d8fa..ce729847d 100644 --- a/pkg/rid/store/datastore/subscriptions_test.go +++ b/pkg/rid/store/sqlstore/subscriptions_test.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index 46bbdea14..673defd82 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -4,7 +4,7 @@ import ( "context" "github.com/interuss/dss/pkg/rid/repos" - riddatastore "github.com/interuss/dss/pkg/rid/store/datastore" + ridsqlstore "github.com/interuss/dss/pkg/rid/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" "go.uber.org/zap" ) @@ -15,5 +15,5 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the rid store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { - return riddatastore.Init(ctx, logger, withCheckCron) + return ridsqlstore.Init(ctx, logger, withCheckCron) } diff --git a/pkg/scd/store/datastore/doc.go b/pkg/scd/store/datastore/doc.go deleted file mode 100644 index ebcb40863..000000000 --- a/pkg/scd/store/datastore/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package scd.store.datastore provides a full implementation of store.Store[scd.repos.Repository] -// for data backings that use a database such as CockroachDB or YugabyteDB. -package datastore diff --git a/pkg/scd/store/datastore/availability.go b/pkg/scd/store/sqlstore/availability.go similarity index 99% rename from pkg/scd/store/datastore/availability.go rename to pkg/scd/store/sqlstore/availability.go index d39987c4b..b23861087 100644 --- a/pkg/scd/store/datastore/availability.go +++ b/pkg/scd/store/sqlstore/availability.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/scd/store/datastore/constraints.go b/pkg/scd/store/sqlstore/constraints.go similarity index 99% rename from pkg/scd/store/datastore/constraints.go rename to pkg/scd/store/sqlstore/constraints.go index 2923ee167..45580b851 100644 --- a/pkg/scd/store/datastore/constraints.go +++ b/pkg/scd/store/sqlstore/constraints.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/scd/store/sqlstore/doc.go b/pkg/scd/store/sqlstore/doc.go new file mode 100644 index 000000000..227be5fde --- /dev/null +++ b/pkg/scd/store/sqlstore/doc.go @@ -0,0 +1,3 @@ +// Package scd.store.sqlstore provides a full implementation of store.Store[scd.repos.Repository] +// for data backings that use a database such as CockroachDB or YugabyteDB. +package sqlstore diff --git a/pkg/scd/store/datastore/operational_intents.go b/pkg/scd/store/sqlstore/operational_intents.go similarity index 99% rename from pkg/scd/store/datastore/operational_intents.go rename to pkg/scd/store/sqlstore/operational_intents.go index 282b673c5..2a6250624 100644 --- a/pkg/scd/store/datastore/operational_intents.go +++ b/pkg/scd/store/sqlstore/operational_intents.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/scd/store/datastore/operational_intents_test.go b/pkg/scd/store/sqlstore/operational_intents_test.go similarity index 99% rename from pkg/scd/store/datastore/operational_intents_test.go rename to pkg/scd/store/sqlstore/operational_intents_test.go index e6a502659..a296b2631 100644 --- a/pkg/scd/store/datastore/operational_intents_test.go +++ b/pkg/scd/store/sqlstore/operational_intents_test.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/scd/store/datastore/store.go b/pkg/scd/store/sqlstore/store.go similarity index 66% rename from pkg/scd/store/datastore/store.go rename to pkg/scd/store/sqlstore/store.go index 3b48fd3ba..02c8b31cc 100644 --- a/pkg/scd/store/datastore/store.go +++ b/pkg/scd/store/sqlstore/store.go @@ -1,24 +1,24 @@ -package datastore +package sqlstore import ( "context" dssql "github.com/interuss/dss/pkg/sql" - "github.com/interuss/dss/pkg/datastore" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/scd/repos" + "github.com/interuss/dss/pkg/sqlstore" "github.com/jonboulle/clockwork" "go.uber.org/zap" ) const ( - // The current major schema version per datastore type. + // The current major schema version per sqlstore type. currentCrdbMajorSchemaVersion = 3 currentYugabyteMajorSchemaVersion = 1 ) -// scd.store.datastore.repo is a full implementation of scd.repos.Repository for data backings that +// scd.store.sqlstore.repo is a full implementation of scd.repos.Repository for data backings that // use a database such as CockroachDB or YugabyteDB. type repo struct { q dssql.Queryable @@ -27,14 +27,14 @@ type repo struct { globalLock bool } -// Init initializes the SQL-backed sid store. It return a concrete datastore.Store[sid.repos.Repository] providing the +// Init initializes the SQL-backed sid store. It return a concrete sqlstore.Store[sid.repos.Repository] providing the // ability to interact with a database-backed store of sid information. -func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (*datastore.Store[repos.Repository], error) { - return datastore.Init(ctx, datastore.Config[repos.Repository]{ +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (*sqlstore.Store[repos.Repository], error) { + return sqlstore.Init(ctx, sqlstore.Config[repos.Repository]{ DBName: "scd", CrdbMajorSchemaVersion: currentCrdbMajorSchemaVersion, YbMajorSchemaVersion: currentYugabyteMajorSchemaVersion, - NewRepo: func(q dssql.Queryable, clock clockwork.Clock, _ *datastore.Version) repos.Repository { + NewRepo: func(q dssql.Queryable, clock clockwork.Clock, _ *sqlstore.Version) repos.Repository { return &repo{ q: q, clock: clock, diff --git a/pkg/scd/store/datastore/store_test.go b/pkg/scd/store/sqlstore/store_test.go similarity index 76% rename from pkg/scd/store/datastore/store_test.go rename to pkg/scd/store/sqlstore/store_test.go index f86218a7b..3733d737b 100644 --- a/pkg/scd/store/datastore/store_test.go +++ b/pkg/scd/store/sqlstore/store_test.go @@ -1,13 +1,13 @@ -package datastore +package sqlstore import ( "context" "testing" - "github.com/interuss/dss/pkg/datastore" - "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/scd/repos" + "github.com/interuss/dss/pkg/sqlstore" + "github.com/interuss/dss/pkg/sqlstore/params" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -16,7 +16,7 @@ var ( fakeClock = clockwork.NewFakeClock() ) -func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repository], func()) { +func setUpStore(ctx context.Context, t *testing.T) (*sqlstore.Store[repos.Repository], func()) { connectParameters := params.GetConnectParameters() if connectParameters.Host == "" || connectParameters.Port == 0 { t.Skip() @@ -33,7 +33,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*datastore.Store[repos.Repos } } -func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*datastore.Store[repos.Repository], error) { +func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*sqlstore.Store[repos.Repository], error) { s, err := Init(ctx, logging.Logger, false, false) if err != nil { @@ -45,7 +45,7 @@ func newTestStore(ctx context.Context, t *testing.T, connectParameters params.Co } // cleanUp drops all required tables from the store, useful for testing. -func cleanUp(ctx context.Context, s *datastore.Store[repos.Repository]) error { +func cleanUp(ctx context.Context, s *sqlstore.Store[repos.Repository]) error { const query = ` DELETE FROM scd_subscriptions WHERE id IS NOT NULL; DELETE FROM scd_operations WHERE id IS NOT NULL; diff --git a/pkg/scd/store/datastore/subscriptions.go b/pkg/scd/store/sqlstore/subscriptions.go similarity index 99% rename from pkg/scd/store/datastore/subscriptions.go rename to pkg/scd/store/sqlstore/subscriptions.go index 335cb0c06..174b131b2 100644 --- a/pkg/scd/store/datastore/subscriptions.go +++ b/pkg/scd/store/sqlstore/subscriptions.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/scd/store/datastore/subscriptions_test.go b/pkg/scd/store/sqlstore/subscriptions_test.go similarity index 99% rename from pkg/scd/store/datastore/subscriptions_test.go rename to pkg/scd/store/sqlstore/subscriptions_test.go index 181981c3c..3ff3be752 100644 --- a/pkg/scd/store/datastore/subscriptions_test.go +++ b/pkg/scd/store/sqlstore/subscriptions_test.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index 0ccffecb6..c7d98b593 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -4,7 +4,7 @@ import ( "context" "github.com/interuss/dss/pkg/scd/repos" - scddatastore "github.com/interuss/dss/pkg/scd/store/datastore" + scdsqlstore "github.com/interuss/dss/pkg/scd/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" "go.uber.org/zap" ) @@ -15,5 +15,5 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the scd store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (Store, error) { - return scddatastore.Init(ctx, logger, withCheckCron, globalLock) + return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) } diff --git a/pkg/sqlstore/doc.go b/pkg/sqlstore/doc.go new file mode 100644 index 000000000..9e6bc8e16 --- /dev/null +++ b/pkg/sqlstore/doc.go @@ -0,0 +1,3 @@ +// Package sqlstore bundles up types and functions common to data backings that use a database +// (such as CRDB or Yugabyte). +package sqlstore diff --git a/pkg/datastore/params/params.go b/pkg/sqlstore/params/params.go similarity index 100% rename from pkg/datastore/params/params.go rename to pkg/sqlstore/params/params.go diff --git a/pkg/datastore/params/params_test.go b/pkg/sqlstore/params/params_test.go similarity index 100% rename from pkg/datastore/params/params_test.go rename to pkg/sqlstore/params/params_test.go diff --git a/pkg/datastore/store.go b/pkg/sqlstore/store.go similarity index 96% rename from pkg/datastore/store.go rename to pkg/sqlstore/store.go index 33dcaf383..e3bff1ce3 100644 --- a/pkg/datastore/store.go +++ b/pkg/sqlstore/store.go @@ -1,4 +1,4 @@ -package datastore +package sqlstore import ( "context" @@ -10,9 +10,9 @@ import ( crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/coreos/go-semver/semver" "github.com/exaring/otelpgx" - "github.com/interuss/dss/pkg/datastore/params" "github.com/interuss/dss/pkg/logging" dsssql "github.com/interuss/dss/pkg/sql" + "github.com/interuss/dss/pkg/sqlstore/params" "github.com/interuss/stacktrace" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -102,13 +102,13 @@ func Dial[R any](ctx context.Context, connParams params.ConnectParameters) (*Sto var fullVersion string err = dbPool.QueryRow(ctx, versionDbQuery).Scan(&fullVersion) if err != nil { - return nil, stacktrace.Propagate(err, "Error querying datastore version") + return nil, stacktrace.Propagate(err, "Error querying store version") } version, err := NewVersion(fullVersion) if err != nil { - return nil, stacktrace.Propagate(err, "Error querying datastore version") + return nil, stacktrace.Propagate(err, "Error querying store version") } if version.Type == CockroachDB || version.Type == Yugabyte { @@ -128,7 +128,7 @@ func Init[R any](ctx context.Context, cfg Config[R], withCheckCron bool) (*Store db, err := Dial[R](ctx, cp) if err != nil { if strings.Contains(err.Error(), "connect: connection refused") { - return nil, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to datastore server for %s", cfg.DBName) + return nil, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to database server for %s", cfg.DBName) } return nil, stacktrace.Propagate(err, "Failed to connect to %s database", cfg.DBName) } diff --git a/pkg/datastore/version.go b/pkg/sqlstore/version.go similarity index 98% rename from pkg/datastore/version.go rename to pkg/sqlstore/version.go index 5ffcb6b2b..c77549fc9 100644 --- a/pkg/datastore/version.go +++ b/pkg/sqlstore/version.go @@ -1,10 +1,11 @@ -package datastore +package sqlstore import ( + "regexp" + "github.com/coreos/go-semver/semver" "github.com/interuss/stacktrace" "go.uber.org/multierr" - "regexp" ) type Type string From ab128c4e6eb0db1279156f752eee801dc767f1f6 Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Wed, 22 Apr 2026 14:34:55 +0200 Subject: [PATCH 4/6] [store] Move CodeRetryable to generic store --- cmds/core-service/main.go | 4 ++-- pkg/sqlstore/store.go | 9 +++------ pkg/store/store.go | 6 ++++++ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 212d6ddbb..9eeed0746 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -30,7 +30,7 @@ import ( rids "github.com/interuss/dss/pkg/rid/store" "github.com/interuss/dss/pkg/scd" scds "github.com/interuss/dss/pkg/scd/store" - "github.com/interuss/dss/pkg/sqlstore" + "github.com/interuss/dss/pkg/store" "github.com/interuss/dss/pkg/version" "github.com/interuss/dss/pkg/versioning" "github.com/interuss/stacktrace" @@ -350,7 +350,7 @@ func main() { backoff := 0 for { if err := RunHTTPServer(ctx, cancel, *address, *locality); err != nil { - if stacktrace.GetCode(err) == sqlstore.CodeRetryable { + if stacktrace.GetCode(err) == store.CodeRetryable { logger.Info(fmt.Sprintf("Prerequisites not yet satisfied; waiting %.fs to retry...", backoffs[backoff].Seconds()), zap.Error(err)) time.Sleep(backoffs[backoff]) if backoff < len(backoffs)-1 { diff --git a/pkg/sqlstore/store.go b/pkg/sqlstore/store.go index e3bff1ce3..a2a5f3ea5 100644 --- a/pkg/sqlstore/store.go +++ b/pkg/sqlstore/store.go @@ -13,6 +13,7 @@ import ( "github.com/interuss/dss/pkg/logging" dsssql "github.com/interuss/dss/pkg/sql" "github.com/interuss/dss/pkg/sqlstore/params" + "github.com/interuss/dss/pkg/store" "github.com/interuss/stacktrace" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -21,10 +22,6 @@ import ( "go.uber.org/zap" ) -const ( - CodeRetryable = stacktrace.ErrorCode(1) -) - var UnknownVersion = &semver.Version{} // Store is a partial implementation of store.Store when the data backing is a database. @@ -128,7 +125,7 @@ func Init[R any](ctx context.Context, cfg Config[R], withCheckCron bool) (*Store db, err := Dial[R](ctx, cp) if err != nil { if strings.Contains(err.Error(), "connect: connection refused") { - return nil, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to database server for %s", cfg.DBName) + return nil, stacktrace.PropagateWithCode(err, store.CodeRetryable, "Failed to connect to database server for %s", cfg.DBName) } return nil, stacktrace.Propagate(err, "Failed to connect to %s database", cfg.DBName) } @@ -141,7 +138,7 @@ func Init[R any](ctx context.Context, cfg Config[R], withCheckCron bool) (*Store if err != nil { db.Pool.Close() if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), fmt.Sprintf("database \"%s\" does not exist", cfg.DBName)) || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") { - return nil, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to create %s store", cfg.DBName) + return nil, stacktrace.PropagateWithCode(err, store.CodeRetryable, "Failed to create %s store", cfg.DBName) } return nil, stacktrace.Propagate(err, "Failed to create %s store", cfg.DBName) } diff --git a/pkg/store/store.go b/pkg/store/store.go index e9fbfdeb7..e5e95c4b0 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -3,6 +3,8 @@ package store import ( "context" "io" + + "github.com/interuss/stacktrace" ) // store.Store is the generic means to access and interact with any type of data backing the DSS @@ -16,3 +18,7 @@ type Store[R any] interface { // on the R Repo by f will be applied or rejected atomically. Transact(ctx context.Context, f func(context.Context, R) error) error } + +const ( + CodeRetryable = stacktrace.ErrorCode(1) +) From e3f68024e8e83beb1907d30e44d651f8591fc5ba Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Wed, 22 Apr 2026 14:43:19 +0200 Subject: [PATCH 5/6] [store] Add store type flag --- pkg/aux_/store/store.go | 10 +++++++++- pkg/rid/store/store.go | 10 +++++++++- pkg/scd/store/store.go | 10 +++++++++- pkg/store/params/params.go | 25 +++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 pkg/store/params/params.go diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 481fb62e1..6710d80bb 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -6,6 +6,8 @@ import ( "github.com/interuss/dss/pkg/aux_/repos" auxsqlstore "github.com/interuss/dss/pkg/aux_/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" + "github.com/interuss/dss/pkg/store/params" + "github.com/interuss/stacktrace" "go.uber.org/zap" ) @@ -16,5 +18,11 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the aux store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { - return auxsqlstore.Init(ctx, logger, withCheckCron) + storeType := params.GetStoreParameters().StoreType + switch storeType { + case "sql": + return auxsqlstore.Init(ctx, logger, withCheckCron) + default: + return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) + } } diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index 673defd82..026846306 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -6,6 +6,8 @@ import ( "github.com/interuss/dss/pkg/rid/repos" ridsqlstore "github.com/interuss/dss/pkg/rid/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" + "github.com/interuss/dss/pkg/store/params" + "github.com/interuss/stacktrace" "go.uber.org/zap" ) @@ -15,5 +17,11 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the rid store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { - return ridsqlstore.Init(ctx, logger, withCheckCron) + storeType := params.GetStoreParameters().StoreType + switch storeType { + case "sql": + return ridsqlstore.Init(ctx, logger, withCheckCron) + default: + return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) + } } diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index c7d98b593..9e977655d 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -6,6 +6,8 @@ import ( "github.com/interuss/dss/pkg/scd/repos" scdsqlstore "github.com/interuss/dss/pkg/scd/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" + "github.com/interuss/dss/pkg/store/params" + "github.com/interuss/stacktrace" "go.uber.org/zap" ) @@ -15,5 +17,11 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the scd store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (Store, error) { - return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) + storeType := params.GetStoreParameters().StoreType + switch storeType { + case "sql": + return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) + default: + return nil, stacktrace.NewError("Unsupported store type %q for scd", storeType) + } } diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go new file mode 100644 index 000000000..48af00d19 --- /dev/null +++ b/pkg/store/params/params.go @@ -0,0 +1,25 @@ +package params + +import ( + "flag" +) + +type ( + // StoreParameters bundles up parameters used to configure store at a generic/top level. + StoreParameters struct { + StoreType string + } +) + +var ( + storeParameters StoreParameters +) + +func init() { + flag.StringVar(&storeParameters.StoreType, "store_type", "sql", "Store type. Use 'sql' for CockroachDB/YugabyteDB") +} + +// ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags. +func GetStoreParameters() StoreParameters { + return storeParameters +} From 609124ad1802914736ca296630669ea169b167c9 Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Wed, 22 Apr 2026 14:55:25 +0200 Subject: [PATCH 6/6] [store] Add raft example basic implementation --- pkg/aux_/store/raftstore/doc.go | 3 ++ pkg/aux_/store/raftstore/dss.go | 30 +++++++++++++++ pkg/aux_/store/raftstore/store.go | 30 +++++++++++++++ pkg/aux_/store/store.go | 3 ++ pkg/raftstore/doc.go | 2 + pkg/raftstore/params/params.go | 25 +++++++++++++ pkg/raftstore/store.go | 61 +++++++++++++++++++++++++++++++ pkg/store/params/params.go | 2 +- 8 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 pkg/aux_/store/raftstore/doc.go create mode 100644 pkg/aux_/store/raftstore/dss.go create mode 100644 pkg/aux_/store/raftstore/store.go create mode 100644 pkg/raftstore/doc.go create mode 100644 pkg/raftstore/params/params.go create mode 100644 pkg/raftstore/store.go diff --git a/pkg/aux_/store/raftstore/doc.go b/pkg/aux_/store/raftstore/doc.go new file mode 100644 index 000000000..5cd3208e7 --- /dev/null +++ b/pkg/aux_/store/raftstore/doc.go @@ -0,0 +1,3 @@ +// Package aux_.store.raftstore provides a concrete implementation of a +// store.Store[aux_.repos.Repository] using a raft csluter as a data backing +package raftstore diff --git a/pkg/aux_/store/raftstore/dss.go b/pkg/aux_/store/raftstore/dss.go new file mode 100644 index 000000000..c4c4a798e --- /dev/null +++ b/pkg/aux_/store/raftstore/dss.go @@ -0,0 +1,30 @@ +package raftstore + +import ( + "context" + + auxmodels "github.com/interuss/dss/pkg/aux_/models" + dsserr "github.com/interuss/dss/pkg/errors" + "github.com/interuss/stacktrace" +) + +func (r *repo) SaveOwnMetadata(ctx context.Context, locality string, publicEndpoint string) error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SaveOwnMetadata is not yet supported in raftstore") +} + +func (r *repo) GetDSSMetadata(ctx context.Context) ([]*auxmodels.DSSMetadata, error) { + + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDSSMetadata is not yet supported in raftstore") + +} + +func (r *repo) RecordHeartbeat(ctx context.Context, heartbeat auxmodels.Heartbeat) error { + + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "RecordHeartbeat is not yet supported in raftstore") + +} + +// GetDSSAirspaceRepresentationID gets the ID of the common DSS Airspace Representation the raftstore represents. +func (r *repo) GetDSSAirspaceRepresentationID(ctx context.Context) (string, error) { + return "", stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDSSAirspaceRepresentationID is not yet supported in raftstore") +} diff --git a/pkg/aux_/store/raftstore/store.go b/pkg/aux_/store/raftstore/store.go new file mode 100644 index 000000000..b328484e7 --- /dev/null +++ b/pkg/aux_/store/raftstore/store.go @@ -0,0 +1,30 @@ +package raftstore + +import ( + "context" + + "github.com/interuss/dss/pkg/aux_/repos" + "github.com/interuss/dss/pkg/logging" + "github.com/interuss/dss/pkg/raftstore" + dssql "github.com/interuss/dss/pkg/sql" + "github.com/jonboulle/clockwork" + "go.uber.org/zap" +) + +type repo struct { + clock clockwork.Clock + logger *zap.Logger +} + +// Init initializes the SQL-backed rid store. It return a concrete raftstore.Store[aux_.repos.Repository] providing the +// ability to interact with a database-backed store of aux information. +func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*raftstore.Store[repos.Repository], error) { + return raftstore.Init(ctx, raftstore.Config[repos.Repository]{ + NewRepo: func(q dssql.Queryable, clock clockwork.Clock) repos.Repository { + return &repo{ + clock: clock, + logger: logging.WithValuesFromContext(ctx, logger), + } + }, + }, withCheckCron) +} diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 6710d80bb..61de4eeca 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -4,6 +4,7 @@ import ( "context" "github.com/interuss/dss/pkg/aux_/repos" + auxraftstore "github.com/interuss/dss/pkg/aux_/store/raftstore" auxsqlstore "github.com/interuss/dss/pkg/aux_/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" "github.com/interuss/dss/pkg/store/params" @@ -22,6 +23,8 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType { case "sql": return auxsqlstore.Init(ctx, logger, withCheckCron) + case "raft": + return auxraftstore.Init(ctx, logger, withCheckCron) default: return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) } diff --git a/pkg/raftstore/doc.go b/pkg/raftstore/doc.go new file mode 100644 index 000000000..6166c129c --- /dev/null +++ b/pkg/raftstore/doc.go @@ -0,0 +1,2 @@ +// Package raftstore bundles up types and functions common to data backings that use a raft cluster +package raftstore diff --git a/pkg/raftstore/params/params.go b/pkg/raftstore/params/params.go new file mode 100644 index 000000000..b1824747a --- /dev/null +++ b/pkg/raftstore/params/params.go @@ -0,0 +1,25 @@ +package params + +import ( + "flag" +) + +type ( + // ConnectParameters bundles up parameters used for connecting to a raft cluster. + ConnectParameters struct { + Peers string + } +) + +var ( + connectParameters ConnectParameters +) + +func init() { + flag.StringVar(&connectParameters.Peers, "raft_peers", "", "comma-separated list of raft cluster peers, e.g. node-1=10.0.0.1:7000,node-2=10.0.0.2:7000") +} + +// ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags. +func GetConnectParameters() ConnectParameters { + return connectParameters +} diff --git a/pkg/raftstore/store.go b/pkg/raftstore/store.go new file mode 100644 index 000000000..71012e814 --- /dev/null +++ b/pkg/raftstore/store.go @@ -0,0 +1,61 @@ +package raftstore + +import ( + "context" + "strings" + + "github.com/coreos/go-semver/semver" + "github.com/interuss/dss/pkg/raftstore/params" + dsssql "github.com/interuss/dss/pkg/sql" + "github.com/interuss/dss/pkg/store" + "github.com/interuss/stacktrace" + "github.com/jonboulle/clockwork" +) + +var UnknownVersion = &semver.Version{} + +// Store is a partial implementation of store.Store when the data backing is a raft store. +type Store[R any] struct { +} + +// Config describes everything a raft-backed store needs to be initialized for a +// given specific package (rid, scd, aux, ...). +type Config[R any] struct { + NewRepo func(q dsssql.Queryable, clock clockwork.Clock) R +} + +func Dial[R any](ctx context.Context, connParams params.ConnectParameters) (*Store[R], error) { + + // Connect via connParams.Peers + return &Store[R]{}, nil + +} + +// Init dials the database described by the global connect parameters (plus +// cfg.DBName), checks its schema version, and returns a ready-to-use Store[R]. +// If withCheckCron is true, a periodic health-check cron is started. +func Init[R any](ctx context.Context, cfg Config[R], withCheckCron bool) (*Store[R], error) { + + db, err := Dial[R](ctx, params.GetConnectParameters()) + if err != nil { + if strings.Contains(err.Error(), "connect: connection refused") { + return nil, stacktrace.PropagateWithCode(err, store.CodeRetryable, "Failed to connect to raft cluster") + } + return nil, stacktrace.Propagate(err, "Failed to connect to the raft cluster") + } + + return db, nil +} + +func (s *Store[R]) Interact(_ context.Context) (R, error) { + var zero R + return zero, stacktrace.NewError("Not implemented") +} + +func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { + return stacktrace.NewError("Not implemented") +} + +func (s *Store[R]) Close() error { + return stacktrace.NewError("Not implemented") +} diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go index 48af00d19..d8591b1aa 100644 --- a/pkg/store/params/params.go +++ b/pkg/store/params/params.go @@ -16,7 +16,7 @@ var ( ) func init() { - flag.StringVar(&storeParameters.StoreType, "store_type", "sql", "Store type. Use 'sql' for CockroachDB/YugabyteDB") + flag.StringVar(&storeParameters.StoreType, "store_type", "sql", "Store type. Use 'sql' for CockroachDB/YugabyteDB, or 'raft' for raft implementation") } // ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags.