Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[/v4] - Add session level advisory lock (postgres only) #493

Merged
merged 6 commits into from
Apr 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
# Local testing
.envrc
*.FAIL
/scripts
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ docker-start-postgres:
-l goose_test \
postgres:14-alpine -c log_statement=all

comments:
todo:
rg --type go --ignore-case '//.*(todo|feat)\('

gh-links:
Expand Down
17 changes: 13 additions & 4 deletions apply_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"database/sql"
"errors"
"fmt"

"github.com/pressly/goose/v4/internal/sqlparser"
"go.uber.org/multierr"
)

// ApplyVersion applies exactly one migration at the specified version. If a migration cannot be
Expand All @@ -14,17 +16,24 @@ import (
//
// If the direction is true, the migration will be applied. If the direction is false, the migration
// will be rolled back.
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (*MigrationResult, error) {
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (_ *MigrationResult, retErr error) {
if version < 1 {
return nil, fmt.Errorf("invalid version: %d", version)
}

m, err := p.getMigration(version)
if err != nil {
return nil, err
}
conn, err := p.db.Conn(ctx)

conn, cleanup, err := p.initialize(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// Ensure version table exists.
if err := p.ensureVersionTable(ctx, conn); err != nil {
return nil, err
}
Expand Down
17 changes: 8 additions & 9 deletions down.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/pressly/goose/v4/internal/sqlparser"
"go.uber.org/multierr"
)

// Down rolls back the most recently applied migration.
Expand All @@ -30,21 +31,19 @@ func (p *Provider) DownTo(ctx context.Context, version int64) ([]*MigrationResul
return p.down(ctx, false, version)
}

func (p *Provider) down(ctx context.Context, downByOne bool, version int64) ([]*MigrationResult, error) {
func (p *Provider) down(ctx context.Context, downByOne bool, version int64) (_ []*MigrationResult, retErr error) {
if version < 0 {
return nil, fmt.Errorf("version must be a number greater than or equal zero: %d", version)
}

conn, err := p.db.Conn(ctx)
conn, cleanup, err := p.initialize(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// feat(mf): this is where a session level advisory lock would be acquired to ensure that only
// one goose process is running at a time. Also need to lock the Provider itself with a mutex.
// https://github.com/pressly/goose/issues/335

defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// Ensure version table exists.
if err := p.ensureVersionTable(ctx, conn); err != nil {
return nil, err
}
Expand All @@ -67,7 +66,7 @@ func (p *Provider) down(ctx context.Context, downByOne bool, version int64) ([]*
return nil, err
}
if dbMigrations[0].Version == 0 {
return nil, ErrNoCurrentVersion
return nil, nil
}

// This is the sequential path.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-sql-driver/mysql v1.7.0
github.com/jackc/pgx/v5 v5.3.1
github.com/ory/dockertest/v3 v3.9.1
github.com/sethvargo/go-retry v0.2.4
github.com/vertica/vertica-sql-go v1.3.1
github.com/ziutek/mymysql v1.5.4
go.uber.org/multierr v1.10.0
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
Expand Down Expand Up @@ -212,6 +214,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
20 changes: 20 additions & 0 deletions internal/dialectadapter/dialectquery/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,23 @@ func (p *Postgres) ListMigrations() string {
q := `SELECT version_id, is_applied from %s ORDER BY id DESC`
return fmt.Sprintf(q, p.Table)
}

// AdvisoryLockSession returns the query to lock the database using an exclusive session level
// advisory lock.
func (p *Postgres) AdvisoryLockSession() string {
return `SELECT pg_advisory_lock($1)`
}

// AdvisoryUnlockSession returns the query to release an exclusive session level advisory lock.
func (p *Postgres) AdvisoryUnlockSession() string {
return `SELECT pg_advisory_unlock($1)`
}

// AdvisoryLockTransaction returns the query to lock the database using an exclusive transaction
// level advisory lock.
//
// The lock is automatically released at the end of the current transaction and cannot be released
// explicitly.
func (p *Postgres) AdvisoryLockTransaction() string {
return `SELECT pg_advisory_xact_lock($1)`
}
125 changes: 125 additions & 0 deletions internal/dialectadapter/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package dialectadapter

import (
"context"
"database/sql"
"errors"
"hash/crc64"

"github.com/pressly/goose/v4/internal/dialectadapter/dialectquery"
"github.com/sethvargo/go-retry"
)

var (
// defaultLockID is the id used to lock the database for migrations. It is a crc64 hash of the
// string "goose". This is used to ensure that the lock is unique to goose.
//
// 5887940537704921958
defaultLockID = crc64.Checksum([]byte("goose"), crc64.MakeTable(crc64.ECMA))

// ErrLockNotImplemented is returned when the database does not support locking.
ErrLockNotImplemented = errors.New("lock not implemented")
)

// Locker defines the methods to lock and unlock the database.
//
// Locking is an experimental feature and the underlying implementation may change in the future.
//
// The only database that currently supports locking is Postgres. Other databases will return
// ErrLockNotImplemented.
type Locker interface {
// IsSupported returns true if the database supports locking.
IsSupported() bool

// LockSession and UnlockSession are used to lock the database for the duration of a session.
//
// The session is defined as the duration of a single connection.
LockSession(ctx context.Context, conn *sql.Conn) error
UnlockSession(ctx context.Context, conn *sql.Conn) error

// LockTransaction is used to lock the database for the duration of a transaction.
LockTransaction(ctx context.Context, tx *sql.Tx) error
}

func (s *store) IsSupported() bool {
switch s.querier.(type) {
case *dialectquery.Postgres:
return true
default:
return false
}
}

func (s *store) LockSession(ctx context.Context, conn *sql.Conn) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
// TODO(mf): need to be VERY careful about the retry logic here to avoid stacking locks on
// top of each other. We need to make sure that we only retry if the lock is not already
// held.
//
// This retry is a bit pointless because if we can't get the lock, chances are another
// process is holding the lock and we will just spin here forever. We should probably just
// remove this retry.
//
// At best this might help with a transient network issue.
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
if _, err := conn.ExecContext(ctx, t.AdvisoryLockSession(), defaultLockID); err != nil {
return retry.RetryableError(err)
}
return nil
})
}
return ErrLockNotImplemented
}

func (s *store) UnlockSession(ctx context.Context, conn *sql.Conn) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
var unlocked bool
row := conn.QueryRowContext(ctx, t.AdvisoryUnlockSession(), defaultLockID)
if err := row.Scan(&unlocked); err != nil {
return retry.RetryableError(err)
}
if !unlocked {

// TODO(mf): provide the user with some documentation on how they can unlock the
// session manually. Although this is probably an issue for 99.9% of users
// since pg_advisory_unlock_all() will release all session level advisory locks held
// by the current session. (This function is implicitly invoked at session end, even
// if the client disconnects ungracefully.)
//
// TODO(mf): - we may not want to bother checking the return value and just assume
// that the lock was released. This would simplify the code and remove the need for
// the unlocked bool.
//
// SELECT pid,granted,((classid::bigint<<32)|objid::bigint)AS goose_lock_id FROM
// pg_locks WHERE locktype='advisory';
//
// | pid | granted | goose_lock_id |
// |-----|---------|---------------------|
// | 191 | t | 5887940537704921958 |
//
// A more forceful way to unlock the session is to terminate the process: SELECT
// pg_terminate_backend(120);

return errors.New("failed to unlock session")
}
return nil
})
}
return ErrLockNotImplemented
}

func (s *store) LockTransaction(ctx context.Context, tx *sql.Tx) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
if _, err := tx.ExecContext(ctx, t.AdvisoryLockTransaction(), defaultLockID); err != nil {
return retry.RetryableError(err)
}
return nil
})
}
return ErrLockNotImplemented
}
14 changes: 13 additions & 1 deletion internal/dialectadapter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pressly/goose/v4/internal/dialectadapter/dialectquery"
"github.com/sethvargo/go-retry"
)

// EXPERIMENTAL: This is an experimental feature and may change in the future.
Expand Down Expand Up @@ -47,6 +48,10 @@ type Store interface {
// If there are no migrations, an empty slice is returned with no error.
ListMigrationsConn(ctx context.Context, conn *sql.Conn) ([]*ListMigrationsResult, error)
ListMigrations(ctx context.Context, db *sql.DB) ([]*ListMigrationsResult, error)

// Locker defines the methods for locking the database. Some databases
// do not support locking, in which case ErrLockNotImplemented is returned.
Locker
}

// NewStore returns a new Store for the given dialect.
Expand Down Expand Up @@ -77,7 +82,13 @@ func NewStore(d Dialect, table string) (Store, error) {
default:
return nil, fmt.Errorf("unknown querier dialect: %v", d)
}
return &store{querier: querier}, nil
r := retry.NewFibonacci(1 * time.Second)
r = retry.WithCappedDuration(5*time.Second, r)
r = retry.WithMaxDuration(30*time.Second, r)
return &store{
querier: querier,
retry: r,
}, nil
}

type GetMigrationResult struct {
Expand All @@ -92,6 +103,7 @@ type ListMigrationsResult struct {

type store struct {
querier dialectquery.Querier
retry retry.Backoff
}

var _ Store = (*store)(nil)
Expand Down
39 changes: 37 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type Options struct {
Filesystem fs.FS

// Commonly modified options.
Logger Logger
Verbose bool
Logger Logger
Verbose bool
LockMode LockMode

// Features.
AllowMissing bool
Expand Down Expand Up @@ -134,3 +135,37 @@ func (o Options) SetExcludeFilenames(filenames ...string) Options {
o.ExcludeFilenames = filenames
return o
}

type LockMode int

const (
LockModeNone LockMode = iota
LockModeAdvisorySession
LockModeAdvisoryTransaction
LockModeFile
)

func (l LockMode) String() string {
switch l {
case LockModeNone:
return "none"
case LockModeAdvisorySession:
return "advisory-session"
case LockModeAdvisoryTransaction:
return "advisory-transaction"
case LockModeFile:
return "file"
default:
return "unknown"
}
}

// SetLockMode returns a new Options value with LockMode set to the given value. LockMode is the
// locking mode to use when applying migrations. Locking is used to prevent multiple instances of
// goose from applying migrations concurrently.
//
// Default: LockModeNone
func (o Options) SetLockMode(m LockMode) Options {
o.LockMode = m
return o
}
9 changes: 9 additions & 0 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (p *Provider) ListMigrations() []*Migration {
return migrations
}

// GetLastVersion returns the version of the last migration found in the migrations directory
// (sorted by version). If there are no migrations, then 0 is returned.
func (p *Provider) GetLastVersion() int64 {
if len(p.migrations) == 0 {
return 0
}
return p.migrations[len(p.migrations)-1].version
}

// Ping attempts to ping the database to verify a connection is available.
func (p *Provider) Ping(ctx context.Context) error {
return p.db.PingContext(ctx)
Expand Down