Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Refactor extension logic to be the same between extensions
Browse files Browse the repository at this point in the history
Make timescaledb and promscale PG extension migration use the same
functions.
  • Loading branch information
cevian committed Oct 3, 2020
1 parent b6f7fe0 commit 074867c
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 40 deletions.
4 changes: 4 additions & 0 deletions pkg/pgmodel/end_to_end_tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func withDB(t testing.TB, DBName string, f func(db *pgxpool.Pool, t testing.TB))
}

func performMigrate(t testing.TB, connectURL string) {
err := pgmodel.MigrateTimescaleDBExtension(connectURL)
if err != nil {
t.Fatal(err)
}
migratePool, err := pgxpool.Connect(context.Background(), connectURL)
if err != nil {
t.Fatal(err)
Expand Down
68 changes: 37 additions & 31 deletions pkg/pgmodel/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,62 +76,70 @@ func checkPromscaleExtensionVersion(conn *pgx.Conn) error {
return nil
}

func migrateExtension(conn *pgx.Conn) error {
availableVersions, err := fetchAvailableExtensionVersions(conn)
ExtensionIsInstalled = false
func migrateExtension(conn *pgx.Conn, extName string, extSchemaName string, validRange semver.Range, rangeString string) error {
availableVersions, err := fetchAvailableExtensionVersions(conn, extName)
if err != nil {
return err
}
if len(availableVersions) == 0 {
log.Warn("msg", "the promscale extension is not available, proceeding without extension")
return nil
return fmt.Errorf("the extension is not available")
}

newVersion, ok := getNewExtensionVersion(availableVersions)
newVersion, ok := getNewExtensionVersion(availableVersions, validRange)
if !ok {
log.Warn("msg", "the promscale extension is not available at the right version, need version: %v, proceeding without extension", version.ExtVersionRangeString)
return nil
return fmt.Errorf("the extension is not available at the right version, need version: %v", rangeString)
}

currentVersion, isInstalled, err := fetchInstalledExtensionVersion(conn, "promscale")
currentVersion, isInstalled, err := fetchInstalledExtensionVersion(conn, extName)
if err != nil {
return fmt.Errorf("Could not get the installed extension version: %w", err)
}

if !isInstalled {
_, extErr := conn.Exec(context.Background(),
fmt.Sprintf("CREATE EXTENSION IF NOT EXISTS promscale WITH SCHEMA %s VERSION '%s'",
extSchema, getSqlVersion(newVersion)))
fmt.Sprintf("CREATE EXTENSION IF NOT EXISTS %s WITH SCHEMA %s VERSION '%s'",
extName, extSchemaName, getSqlVersion(newVersion, extName)))
if extErr != nil {
return extErr
}
return checkExtensionsVersion(conn)
return nil
}

comparator := currentVersion.Compare(newVersion)
if comparator > 0 {
//currentVersion greater than what we can handle, don't use the extension
log.Warn("msg", "the promscale extension at a greater version than supported by the connector: %v > %v, proceeding without extension", currentVersion, newVersion)
return fmt.Errorf("the extension at a greater version than supported by the connector: %v > %v", currentVersion, newVersion)
} else if comparator == 0 {
//Nothing to do we are at the correct version
return nil
} else {
//Upgrade to the right version
_, err := conn.Exec(context.Background(),
fmt.Sprintf("ALTER EXTENSION promscale UPDATE TO '%s'",
getSqlVersion(newVersion)))
connAlter := conn
if extName == "timescaledb" {
//TimescaleDB requires a fresh connection for altering
//Note: all previously opened connections will become invalid
connAlter, err = pgx.ConnectConfig(context.Background(), conn.Config())
if err != nil {
return err
}
defer func() { _ = connAlter.Close(context.Background()) }()
}
_, err := connAlter.Exec(context.Background(),
fmt.Sprintf("ALTER EXTENSION %s UPDATE TO '%s'", extName,
getSqlVersion(newVersion, extName)))
if err != nil {
return err
}
}

return checkExtensionsVersion(conn)
return nil
}

func fetchAvailableExtensionVersions(conn *pgx.Conn) (semver.Versions, error) {
func fetchAvailableExtensionVersions(conn *pgx.Conn, extName string) (semver.Versions, error) {
var versionStrings []string
versions := make(semver.Versions, 0)
err := conn.QueryRow(context.Background(),
"SELECT array_agg(version) FROM pg_available_extension_versions WHERE name ='promscale'").Scan(&versionStrings)
"SELECT array_agg(version) FROM pg_available_extension_versions WHERE name = $1", extName).Scan(&versionStrings)

if err != nil {
return versions, err
Expand All @@ -141,7 +149,7 @@ func fetchAvailableExtensionVersions(conn *pgx.Conn) (semver.Versions, error) {
}

for i := range versionStrings {
vString := correctVersionString(versionStrings[i])
vString := correctVersionString(versionStrings[i], extName)
v, err := semver.Parse(vString)
if err != nil {
return versions, fmt.Errorf("Could not parse available extra extension version %v: %w", vString, err)
Expand All @@ -156,7 +164,7 @@ func fetchInstalledExtensionVersion(conn *pgx.Conn, extensionName string) (semve
var versionString string
if err := conn.QueryRow(
context.Background(),
"SELECT extversion FROM pg_extension WHERE extname=$1;",
"SELECT extversion FROM pg_extension WHERE extName=$1;",
extensionName,
).Scan(&versionString); err != nil {
if err == pgx.ErrNoRows {
Expand All @@ -165,9 +173,7 @@ func fetchInstalledExtensionVersion(conn *pgx.Conn, extensionName string) (semve
return semver.Version{}, true, err
}

if extensionName == "promscale" {
versionString = correctVersionString(versionString)
}
versionString = correctVersionString(versionString, extensionName)

v, err := semver.Parse(versionString)
if err != nil {
Expand All @@ -176,27 +182,27 @@ func fetchInstalledExtensionVersion(conn *pgx.Conn, extensionName string) (semve
return v, true, nil
}

func correctVersionString(v string) string {
func correctVersionString(v string, extName string) string {
//we originally published the extension as "0.1" which isn't a valid semver
if v == "0.1" {
if extName == "promscale" && v == "0.1" {
return "0.1.0"
}
return v
}

func getSqlVersion(v semver.Version) string {
if v.String() == "0.1.0" {
func getSqlVersion(v semver.Version, extName string) string {
if extName == "promscale" && v.String() == "0.1.0" {
return "0.1"
}
return v.String()
}

// getNewExtensionVersion returns the highest version allowed by ExtVersionRange
func getNewExtensionVersion(availableVersions semver.Versions) (semver.Version, bool) {
// getNewExtensionVersion returns the highest version allowed by validRange
func getNewExtensionVersion(availableVersions semver.Versions, validRange semver.Range) (semver.Version, bool) {
//sort higher extensions first
sort.Sort(sort.Reverse(availableVersions))
for i := range availableVersions {
if version.ExtVersionRange(availableVersions[i]) {
if validRange(availableVersions[i]) {
return availableVersions[i], true
}
}
Expand Down
41 changes: 33 additions & 8 deletions pkg/pgmodel/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/migrations"
"github.com/timescale/promscale/pkg/version"
)

const (
timescaleInstall = "CREATE EXTENSION IF NOT EXISTS timescaledb WITH SCHEMA public;"
metadataUpdateWithExtension = "SELECT update_tsprom_metadata($1, $2, $3)"
metadataUpdateNoExtension = "INSERT INTO _timescaledb_catalog.metadata(key, value, include_in_telemetry) VALUES ('promscale_' || $1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, include_in_telemetry = EXCLUDED.include_in_telemetry"
createMigrationsTable = "CREATE TABLE IF NOT EXISTS prom_schema_migrations (version text not null primary key)"
Expand Down Expand Up @@ -82,6 +82,27 @@ func (p prefixedNames) getNames() []string {
return names
}

// MigrateTimescaleDBExtension installs or updates TimescaleDB
// Note that after this call any previous connections can break
// so this has to be called ahead of opening connections.
//
// Also this takes a connection string not a connection because for
// updates the ALTER has to be the first command on the connection
// thus we cannot reuse existing connections
func MigrateTimescaleDBExtension(connstr string) error {
db, err := pgx.Connect(context.Background(), connstr)
if err != nil {
return err
}
defer func() { _ = db.Close(context.Background()) }()

err = migrateExtension(db, "timescaledb", "public", version.TimescaleVersionRange, version.TimescaleVersionRangeFullString)
if err != nil {
return fmt.Errorf("could not install timescaledb: %w", err)
}
return nil
}

// Migrate performs a database migration to the latest version
func Migrate(db *pgx.Conn, versionInfo VersionInfo) (err error) {
migrateMutex.Lock()
Expand All @@ -100,8 +121,13 @@ func Migrate(db *pgx.Conn, versionInfo VersionInfo) (err error) {
return fmt.Errorf("Error encountered during migration: %w", err)
}

err = migrateExtension(db)
ExtensionIsInstalled = false
err = migrateExtension(db, "promscale", extSchema, version.ExtVersionRange, version.ExtVersionRangeString)
if err != nil {
log.Warn("msg", fmt.Sprintf("could not install promscale: %v. continuing without extension", err))
}

if err = checkExtensionsVersion(db); err != nil {
return fmt.Errorf("Error encountered while migrating extension: %w", err)
}

Expand All @@ -118,7 +144,11 @@ func CheckDependencies(db *pgx.Conn, versionInfo VersionInfo) (err error) {
return err
}

return checkExtensionsVersion(db)
if err = checkExtensionsVersion(db); err != nil {
return err
}

return err
}

// CheckSchemaVersion checks the DB schema version without checking the extension
Expand Down Expand Up @@ -196,11 +226,6 @@ func (t *Migrator) Migrate(appVersion semver.Version) error {
_ = tx.Rollback(context.Background())
}()

_, err = tx.Exec(context.Background(), timescaleInstall)
if err != nil {
return fmt.Errorf("timescaledb failed to install due to %w", err)
}

// No version in DB.
if dbVersion.Compare(semver.Version{}) == 0 {
if err = t.execMigrationDir(tx, preinstallScripts); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/pgmodel/upgrade_tests/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,17 @@ func withNewDBAtCurrentVersion(t testing.TB, DBName string,
}

func migrateToVersion(t testing.TB, connectURL string, version string, commitHash string) {
err := pgmodel.MigrateTimescaleDBExtension(connectURL)
if err != nil {
t.Fatal(err)
}

migratePool, err := pgx.Connect(context.Background(), connectURL)
if err != nil {
t.Fatal(err)
}
defer func() { _ = migratePool.Close(context.Background()) }()
err = Migrate(migratePool, pgmodel.VersionInfo{Version: version, CommitHash: commitHash})
err = Migrate(migratePool, pgmodel.VersionInfo{Version: version, CommitHash: commitHash}, &pgmodel.MigrateOptions{InstallTimescaleDB: true})
if err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func Run(cfg *Config) error {
}

func CreateClient(cfg *Config, promMetrics *api.Metrics) (*pgclient.Client, error) {
//The TimescaleDB migration has to happen before other connections
//are open. Also, it has to happen as the first command on a connection
//thus, we cannot rely on a migration lock here. Instead we assume
//that upgrading TimescaleDB will not break existing connectors.
if cfg.InstallTimescaleDB {
err := pgmodel.MigrateTimescaleDBExtension(cfg.PgmodelCfg.GetConnectionStr())
if err != nil {
return nil, err
}
}

// Migration lock logic:
// We don't want to upgrade the schema version while we still have connectors
// attached who think the schema is at the old version. To prevent this, as
Expand Down
3 changes: 3 additions & 0 deletions pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
timescaleVersionSafeRange = semver.MustParseRange(TimescaleVersionRangeString.Safe)
timescaleVersionWarnRange = semver.MustParseRange(TimescaleVersionRangeString.Warn)

TimescaleVersionRangeFullString = TimescaleVersionRangeString.Safe + " || " + TimescaleVersionRangeString.Warn
TimescaleVersionRange = timescaleVersionSafeRange.OR(timescaleVersionWarnRange)

// ExtVersionRangeString is a range of required promscale extension versions
ExtVersionRangeString = "=0.1.x"
ExtVersionRange = semver.MustParseRange(ExtVersionRangeString)
Expand Down

0 comments on commit 074867c

Please sign in to comment.