This repository has been archived by the owner on Jun 28, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 326
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
25 changed files
with
491 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
// +build cockroachdb | ||
|
||
package main | ||
|
||
import ( | ||
_ "github.com/mattes/migrate/database/cockroachdb" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,319 @@ | ||
package cockroachdb | ||
|
||
import ( | ||
"database/sql" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
nurl "net/url" | ||
|
||
"github.com/cockroachdb/cockroach-go/crdb" | ||
"github.com/lib/pq" | ||
"github.com/mattes/migrate" | ||
"github.com/mattes/migrate/database" | ||
"regexp" | ||
) | ||
|
||
func init() { | ||
db := CockroachDb{} | ||
database.Register("cockroach", &db) | ||
database.Register("cockroachdb", &db) | ||
database.Register("crdb-postgres", &db) | ||
} | ||
|
||
var DefaultMigrationsTable = "schema_migrations" | ||
var DefaultLockTable = "schema_lock" | ||
|
||
var ( | ||
ErrNilConfig = fmt.Errorf("no config") | ||
ErrNoDatabaseName = fmt.Errorf("no database name") | ||
) | ||
|
||
type Config struct { | ||
MigrationsTable string | ||
LockTable string | ||
DatabaseName string | ||
} | ||
|
||
type CockroachDb struct { | ||
db *sql.DB | ||
isLocked bool | ||
|
||
// Open and WithInstance need to guarantee that config is never nil | ||
config *Config | ||
} | ||
|
||
func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { | ||
if config == nil { | ||
return nil, ErrNilConfig | ||
} | ||
|
||
if err := instance.Ping(); err != nil { | ||
return nil, err | ||
} | ||
|
||
query := `SELECT current_database()` | ||
var databaseName string | ||
if err := instance.QueryRow(query).Scan(&databaseName); err != nil { | ||
return nil, &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
|
||
if len(databaseName) == 0 { | ||
return nil, ErrNoDatabaseName | ||
} | ||
|
||
config.DatabaseName = databaseName | ||
|
||
if len(config.MigrationsTable) == 0 { | ||
config.MigrationsTable = DefaultMigrationsTable | ||
} | ||
|
||
if len(config.LockTable) == 0 { | ||
config.LockTable = DefaultLockTable | ||
} | ||
|
||
px := &CockroachDb{ | ||
db: instance, | ||
config: config, | ||
} | ||
|
||
if err := px.ensureVersionTable(); err != nil { | ||
return nil, err | ||
} | ||
|
||
if err := px.ensureLockTable(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return px, nil | ||
} | ||
|
||
func (c *CockroachDb) Open(url string) (database.Driver, error) { | ||
purl, err := nurl.Parse(url) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// As Cockroach uses the postgres protocol, and 'postgres' is already a registered database, we need to replace the | ||
// connect prefix, with the actual protocol, so that the library can differentiate between the implementations | ||
re := regexp.MustCompile("^(cockroach(db)?|crdb-postgres)") | ||
connectString := re.ReplaceAllString(migrate.FilterCustomQuery(purl).String(), "postgres") | ||
|
||
db, err := sql.Open("postgres", connectString) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
migrationsTable := purl.Query().Get("x-migrations-table") | ||
if len(migrationsTable) == 0 { | ||
migrationsTable = DefaultMigrationsTable | ||
} | ||
|
||
lockTable := purl.Query().Get("x-lock-table") | ||
if len(lockTable) == 0 { | ||
lockTable = DefaultLockTable | ||
} | ||
|
||
px, err := WithInstance(db, &Config{ | ||
DatabaseName: purl.Path, | ||
MigrationsTable: migrationsTable, | ||
LockTable: lockTable, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return px, nil | ||
} | ||
|
||
func (c *CockroachDb) Close() error { | ||
return c.db.Close() | ||
} | ||
|
||
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed | ||
// See: https://github.com/cockroachdb/cockroach/issues/13546 | ||
func (c *CockroachDb) Lock() error { | ||
err := crdb.ExecuteTx(c.db, func(tx *sql.Tx) error { | ||
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1" | ||
rows, err := tx.Query(query, aid) | ||
if err != nil { | ||
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} | ||
} | ||
defer rows.Close() | ||
|
||
// If row exists at all, lock is present | ||
locked := rows.Next() | ||
if locked { | ||
return database.Error{Err: "lock could not be acquired; already locked", Query: []byte(query)} | ||
} | ||
|
||
query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)" | ||
if _, err := tx.Exec(query, aid) ; err != nil { | ||
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
if err != nil { | ||
return err | ||
} else { | ||
c.isLocked = true | ||
return nil | ||
} | ||
} | ||
|
||
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed | ||
// See: https://github.com/cockroachdb/cockroach/issues/13546 | ||
func (c *CockroachDb) Unlock() error { | ||
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until | ||
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances | ||
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1" | ||
if _, err := c.db.Exec(query, aid); err != nil { | ||
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} | ||
} | ||
|
||
c.isLocked = false | ||
return nil | ||
} | ||
|
||
func (c *CockroachDb) Run(migration io.Reader) error { | ||
migr, err := ioutil.ReadAll(migration) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// run migration | ||
query := string(migr[:]) | ||
if _, err := c.db.Exec(query); err != nil { | ||
return database.Error{OrigErr: err, Err: "migration failed", Query: migr} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *CockroachDb) SetVersion(version int, dirty bool) error { | ||
return crdb.ExecuteTx(c.db, func(tx *sql.Tx) error { | ||
if _, err := tx.Exec( `TRUNCATE "` + c.config.MigrationsTable + `"`); err != nil { | ||
return err | ||
} | ||
|
||
if version >= 0 { | ||
if _, err := tx.Exec(`INSERT INTO "` + c.config.MigrationsTable + `" (version, dirty) VALUES ($1, $2)`, version, dirty); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
func (c *CockroachDb) Version() (version int, dirty bool, err error) { | ||
query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" LIMIT 1` | ||
err = c.db.QueryRow(query).Scan(&version, &dirty) | ||
|
||
switch { | ||
case err == sql.ErrNoRows: | ||
return database.NilVersion, false, nil | ||
|
||
case err != nil: | ||
if e, ok := err.(*pq.Error); ok { | ||
// 42P01 is "UndefinedTableError" in CockroachDB | ||
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go | ||
if e.Code == "42P01" { | ||
return database.NilVersion, false, nil | ||
} | ||
} | ||
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} | ||
|
||
default: | ||
return version, dirty, nil | ||
} | ||
} | ||
|
||
func (c *CockroachDb) Drop() error { | ||
// select all tables in current schema | ||
query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema())` | ||
tables, err := c.db.Query(query) | ||
if err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
defer tables.Close() | ||
|
||
// delete one table after another | ||
tableNames := make([]string, 0) | ||
for tables.Next() { | ||
var tableName string | ||
if err := tables.Scan(&tableName); err != nil { | ||
return err | ||
} | ||
if len(tableName) > 0 { | ||
tableNames = append(tableNames, tableName) | ||
} | ||
} | ||
|
||
if len(tableNames) > 0 { | ||
// delete one by one ... | ||
for _, t := range tableNames { | ||
query = `DROP TABLE IF EXISTS ` + t + ` CASCADE` | ||
if _, err := c.db.Exec(query); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
} | ||
if err := c.ensureVersionTable(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *CockroachDb) ensureVersionTable() error { | ||
// check if migration table exists | ||
var count int | ||
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1` | ||
if err := c.db.QueryRow(query, c.config.MigrationsTable).Scan(&count); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
if count == 1 { | ||
return nil | ||
} | ||
|
||
// if not, create the empty migration table | ||
query = `CREATE TABLE "` + c.config.MigrationsTable + `" (version INT NOT NULL PRIMARY KEY, dirty BOOL NOT NULL)` | ||
if _, err := c.db.Exec(query); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
return nil | ||
} | ||
|
||
|
||
func (c *CockroachDb) ensureLockTable() error { | ||
// check if lock table exists | ||
var count int | ||
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1` | ||
if err := c.db.QueryRow(query, c.config.LockTable).Scan(&count); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
if count == 1 { | ||
return nil | ||
} | ||
|
||
// if not, create the empty lock table | ||
query = `CREATE TABLE "` + c.config.LockTable + `" (lock_id INT NOT NULL PRIMARY KEY)` | ||
if _, err := c.db.Exec(query); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.