Skip to content

Commit

Permalink
[/v4] - Add session level advisory lock (postgres only) (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfridman committed Apr 2, 2023
1 parent 97dd4b6 commit 620d17b
Show file tree
Hide file tree
Showing 18 changed files with 527 additions and 44 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
# Local testing
.envrc
*.FAIL
/scripts
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ docker-start-postgres:
-l goose_test \
postgres:14-alpine -c log_statement=all

comments:
todo:
rg --type go --ignore-case '//.*(todo|feat)\('

gh-links:
Expand Down
17 changes: 13 additions & 4 deletions apply_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"database/sql"
"errors"
"fmt"

"github.com/pressly/goose/v4/internal/sqlparser"
"go.uber.org/multierr"
)

// ApplyVersion applies exactly one migration at the specified version. If a migration cannot be
Expand All @@ -14,17 +16,24 @@ import (
//
// If the direction is true, the migration will be applied. If the direction is false, the migration
// will be rolled back.
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (*MigrationResult, error) {
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (_ *MigrationResult, retErr error) {
if version < 1 {
return nil, fmt.Errorf("invalid version: %d", version)
}

m, err := p.getMigration(version)
if err != nil {
return nil, err
}
conn, err := p.db.Conn(ctx)

conn, cleanup, err := p.initialize(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// Ensure version table exists.
if err := p.ensureVersionTable(ctx, conn); err != nil {
return nil, err
}
Expand Down
17 changes: 8 additions & 9 deletions down.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/pressly/goose/v4/internal/sqlparser"
"go.uber.org/multierr"
)

// Down rolls back the most recently applied migration.
Expand All @@ -30,21 +31,19 @@ func (p *Provider) DownTo(ctx context.Context, version int64) ([]*MigrationResul
return p.down(ctx, false, version)
}

func (p *Provider) down(ctx context.Context, downByOne bool, version int64) ([]*MigrationResult, error) {
func (p *Provider) down(ctx context.Context, downByOne bool, version int64) (_ []*MigrationResult, retErr error) {
if version < 0 {
return nil, fmt.Errorf("version must be a number greater than or equal zero: %d", version)
}

conn, err := p.db.Conn(ctx)
conn, cleanup, err := p.initialize(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// feat(mf): this is where a session level advisory lock would be acquired to ensure that only
// one goose process is running at a time. Also need to lock the Provider itself with a mutex.
// https://github.com/pressly/goose/issues/335

defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// Ensure version table exists.
if err := p.ensureVersionTable(ctx, conn); err != nil {
return nil, err
}
Expand All @@ -67,7 +66,7 @@ func (p *Provider) down(ctx context.Context, downByOne bool, version int64) ([]*
return nil, err
}
if dbMigrations[0].Version == 0 {
return nil, ErrNoCurrentVersion
return nil, nil
}

// This is the sequential path.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-sql-driver/mysql v1.7.0
github.com/jackc/pgx/v5 v5.3.1
github.com/ory/dockertest/v3 v3.9.1
github.com/sethvargo/go-retry v0.2.4
github.com/vertica/vertica-sql-go v1.3.1
github.com/ziutek/mymysql v1.5.4
go.uber.org/multierr v1.10.0
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
Expand Down Expand Up @@ -212,6 +214,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
20 changes: 20 additions & 0 deletions internal/dialectadapter/dialectquery/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,23 @@ func (p *Postgres) ListMigrations() string {
q := `SELECT version_id, is_applied from %s ORDER BY id DESC`
return fmt.Sprintf(q, p.Table)
}

// AdvisoryLockSession returns the query to lock the database using an exclusive session level
// advisory lock.
func (p *Postgres) AdvisoryLockSession() string {
return `SELECT pg_advisory_lock($1)`
}

// AdvisoryUnlockSession returns the query to release an exclusive session level advisory lock.
func (p *Postgres) AdvisoryUnlockSession() string {
return `SELECT pg_advisory_unlock($1)`
}

// AdvisoryLockTransaction returns the query to lock the database using an exclusive transaction
// level advisory lock.
//
// The lock is automatically released at the end of the current transaction and cannot be released
// explicitly.
func (p *Postgres) AdvisoryLockTransaction() string {
return `SELECT pg_advisory_xact_lock($1)`
}
125 changes: 125 additions & 0 deletions internal/dialectadapter/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package dialectadapter

import (
"context"
"database/sql"
"errors"
"hash/crc64"

"github.com/pressly/goose/v4/internal/dialectadapter/dialectquery"
"github.com/sethvargo/go-retry"
)

var (
// defaultLockID is the id used to lock the database for migrations. It is a crc64 hash of the
// string "goose". This is used to ensure that the lock is unique to goose.
//
// 5887940537704921958
defaultLockID = crc64.Checksum([]byte("goose"), crc64.MakeTable(crc64.ECMA))

// ErrLockNotImplemented is returned when the database does not support locking.
ErrLockNotImplemented = errors.New("lock not implemented")
)

// Locker defines the methods to lock and unlock the database.
//
// Locking is an experimental feature and the underlying implementation may change in the future.
//
// The only database that currently supports locking is Postgres. Other databases will return
// ErrLockNotImplemented.
type Locker interface {
// IsSupported returns true if the database supports locking.
IsSupported() bool

// LockSession and UnlockSession are used to lock the database for the duration of a session.
//
// The session is defined as the duration of a single connection.
LockSession(ctx context.Context, conn *sql.Conn) error
UnlockSession(ctx context.Context, conn *sql.Conn) error

// LockTransaction is used to lock the database for the duration of a transaction.
LockTransaction(ctx context.Context, tx *sql.Tx) error
}

func (s *store) IsSupported() bool {
switch s.querier.(type) {
case *dialectquery.Postgres:
return true
default:
return false
}
}

func (s *store) LockSession(ctx context.Context, conn *sql.Conn) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
// TODO(mf): need to be VERY careful about the retry logic here to avoid stacking locks on
// top of each other. We need to make sure that we only retry if the lock is not already
// held.
//
// This retry is a bit pointless because if we can't get the lock, chances are another
// process is holding the lock and we will just spin here forever. We should probably just
// remove this retry.
//
// At best this might help with a transient network issue.
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
if _, err := conn.ExecContext(ctx, t.AdvisoryLockSession(), defaultLockID); err != nil {
return retry.RetryableError(err)
}
return nil
})
}
return ErrLockNotImplemented
}

func (s *store) UnlockSession(ctx context.Context, conn *sql.Conn) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
var unlocked bool
row := conn.QueryRowContext(ctx, t.AdvisoryUnlockSession(), defaultLockID)
if err := row.Scan(&unlocked); err != nil {
return retry.RetryableError(err)
}
if !unlocked {

// TODO(mf): provide the user with some documentation on how they can unlock the
// session manually. Although this is probably an issue for 99.9% of users
// since pg_advisory_unlock_all() will release all session level advisory locks held
// by the current session. (This function is implicitly invoked at session end, even
// if the client disconnects ungracefully.)
//
// TODO(mf): - we may not want to bother checking the return value and just assume
// that the lock was released. This would simplify the code and remove the need for
// the unlocked bool.
//
// SELECT pid,granted,((classid::bigint<<32)|objid::bigint)AS goose_lock_id FROM
// pg_locks WHERE locktype='advisory';
//
// | pid | granted | goose_lock_id |
// |-----|---------|---------------------|
// | 191 | t | 5887940537704921958 |
//
// A more forceful way to unlock the session is to terminate the process: SELECT
// pg_terminate_backend(120);

return errors.New("failed to unlock session")
}
return nil
})
}
return ErrLockNotImplemented
}

func (s *store) LockTransaction(ctx context.Context, tx *sql.Tx) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
if _, err := tx.ExecContext(ctx, t.AdvisoryLockTransaction(), defaultLockID); err != nil {
return retry.RetryableError(err)
}
return nil
})
}
return ErrLockNotImplemented
}
14 changes: 13 additions & 1 deletion internal/dialectadapter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pressly/goose/v4/internal/dialectadapter/dialectquery"
"github.com/sethvargo/go-retry"
)

// EXPERIMENTAL: This is an experimental feature and may change in the future.
Expand Down Expand Up @@ -47,6 +48,10 @@ type Store interface {
// If there are no migrations, an empty slice is returned with no error.
ListMigrationsConn(ctx context.Context, conn *sql.Conn) ([]*ListMigrationsResult, error)
ListMigrations(ctx context.Context, db *sql.DB) ([]*ListMigrationsResult, error)

// Locker defines the methods for locking the database. Some databases
// do not support locking, in which case ErrLockNotImplemented is returned.
Locker
}

// NewStore returns a new Store for the given dialect.
Expand Down Expand Up @@ -77,7 +82,13 @@ func NewStore(d Dialect, table string) (Store, error) {
default:
return nil, fmt.Errorf("unknown querier dialect: %v", d)
}
return &store{querier: querier}, nil
r := retry.NewFibonacci(1 * time.Second)
r = retry.WithCappedDuration(5*time.Second, r)
r = retry.WithMaxDuration(30*time.Second, r)
return &store{
querier: querier,
retry: r,
}, nil
}

type GetMigrationResult struct {
Expand All @@ -92,6 +103,7 @@ type ListMigrationsResult struct {

type store struct {
querier dialectquery.Querier
retry retry.Backoff
}

var _ Store = (*store)(nil)
Expand Down
36 changes: 34 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type Options struct {
Filesystem fs.FS

// Commonly modified options.
Logger Logger
Verbose bool
Logger Logger
Verbose bool
LockMode LockMode

// Features.
AllowMissing bool
Expand Down Expand Up @@ -134,3 +135,34 @@ func (o Options) SetExcludeFilenames(filenames ...string) Options {
o.ExcludeFilenames = filenames
return o
}

type LockMode int

const (
LockModeNone LockMode = iota
LockModeAdvisorySession
LockModeAdvisoryTransaction
)

func (l LockMode) String() string {
switch l {
case LockModeNone:
return "none"
case LockModeAdvisorySession:
return "advisory-session"
case LockModeAdvisoryTransaction:
return "advisory-transaction"
default:
return "unknown"
}
}

// SetLockMode returns a new Options value with LockMode set to the given value. LockMode is the
// locking mode to use when applying migrations. Locking is used to prevent multiple instances of
// goose from applying migrations concurrently.
//
// Default: LockModeNone
func (o Options) SetLockMode(m LockMode) Options {
o.LockMode = m
return o
}
9 changes: 9 additions & 0 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (p *Provider) ListMigrations() []*Migration {
return migrations
}

// GetLastVersion returns the version of the last migration found in the migrations directory
// (sorted by version). If there are no migrations, then 0 is returned.
func (p *Provider) GetLastVersion() int64 {
if len(p.migrations) == 0 {
return 0
}
return p.migrations[len(p.migrations)-1].version
}

// Ping attempts to ping the database to verify a connection is available.
func (p *Provider) Ping(ctx context.Context) error {
return p.db.PingContext(ctx)
Expand Down

0 comments on commit 620d17b

Please sign in to comment.