Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[/v4] - Add session level advisory lock (postgres only) #493

Merged
merged 6 commits into from
Apr 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ docker-start-postgres:
-l goose_test \
postgres:14-alpine -c log_statement=all

comments:
todo:
rg --type go --ignore-case '//.*(todo|feat)\('

gh-links:
Expand Down
17 changes: 13 additions & 4 deletions apply_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"database/sql"
"errors"
"fmt"

"github.com/pressly/goose/v4/internal/sqlparser"
"go.uber.org/multierr"
)

// ApplyVersion applies exactly one migration at the specified version. If a migration cannot be
Expand All @@ -14,17 +16,24 @@ import (
//
// If the direction is true, the migration will be applied. If the direction is false, the migration
// will be rolled back.
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (*MigrationResult, error) {
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (_ *MigrationResult, retErr error) {
if version < 1 {
return nil, fmt.Errorf("invalid version: %d", version)
}

m, err := p.getMigration(version)
if err != nil {
return nil, err
}
conn, err := p.db.Conn(ctx)

conn, cleanup, err := p.initialize(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// Ensure version table exists.
if err := p.ensureVersionTable(ctx, conn); err != nil {
return nil, err
}
Expand Down
17 changes: 8 additions & 9 deletions down.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/pressly/goose/v4/internal/sqlparser"
"go.uber.org/multierr"
)

// Down rolls back the most recently applied migration.
Expand All @@ -30,21 +31,19 @@ func (p *Provider) DownTo(ctx context.Context, version int64) ([]*MigrationResul
return p.down(ctx, false, version)
}

func (p *Provider) down(ctx context.Context, downByOne bool, version int64) ([]*MigrationResult, error) {
func (p *Provider) down(ctx context.Context, downByOne bool, version int64) (_ []*MigrationResult, retErr error) {
if version < 0 {
return nil, fmt.Errorf("version must be a number greater than or equal zero: %d", version)
}

conn, err := p.db.Conn(ctx)
conn, cleanup, err := p.initialize(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// feat(mf): this is where a session level advisory lock would be acquired to ensure that only
// one goose process is running at a time. Also need to lock the Provider itself with a mutex.
// https://github.com/pressly/goose/issues/335

defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// Ensure version table exists.
if err := p.ensureVersionTable(ctx, conn); err != nil {
return nil, err
}
Expand All @@ -67,7 +66,7 @@ func (p *Provider) down(ctx context.Context, downByOne bool, version int64) ([]*
return nil, err
}
if dbMigrations[0].Version == 0 {
return nil, ErrNoCurrentVersion
return nil, nil
}

// This is the sequential path.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-sql-driver/mysql v1.7.0
github.com/jackc/pgx/v5 v5.3.1
github.com/ory/dockertest/v3 v3.9.1
github.com/sethvargo/go-retry v0.2.4
github.com/vertica/vertica-sql-go v1.3.1
github.com/ziutek/mymysql v1.5.4
go.uber.org/multierr v1.10.0
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
Expand Down Expand Up @@ -212,6 +214,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
21 changes: 21 additions & 0 deletions internal/dialectadapter/dialectquery/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,24 @@ func (p *Postgres) ListMigrations() string {
q := `SELECT version_id, is_applied from %s ORDER BY id DESC`
return fmt.Sprintf(q, p.Table)
}

// AdvisoryLockSession returns the query to lock the database using an exclusive
// session level advisory lock.
func (p *Postgres) AdvisoryLockSession() string {
return `SELECT pg_advisory_lock($1)`
}

// AdvisoryUnlockSession returns the query to release an exclusive session level
// advisory lock.
func (p *Postgres) AdvisoryUnlockSession() string {
return `SELECT pg_advisory_unlock($1)`
}

// AdvisoryLockTransaction returns the query to lock the database using an exclusive
// transaction level advisory lock.
//
// The lock is automatically released at the end of the current transaction and cannot
// be released explicitly.
func (p *Postgres) AdvisoryLockTransaction() string {
return `SELECT pg_advisory_xact_lock($1)`
}
128 changes: 128 additions & 0 deletions internal/dialectadapter/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package dialectadapter

import (
"context"
"database/sql"
"errors"
"hash/crc64"

"github.com/pressly/goose/v4/internal/dialectadapter/dialectquery"
"github.com/sethvargo/go-retry"
)

var (
// defaultLockID is the id used to lock the database for migrations. It is a
// crc64 hash of the string "goose". This is used to prevent multiple
// goose processes from running migrations at the same time.
//
// 5887940537704921958
defaultLockID = crc64.Checksum([]byte("goose"), crc64.MakeTable(crc64.ECMA))

// ErrLockNotImplemented is returned when the database does not support locking.
ErrLockNotImplemented = errors.New("lock not implemented")
)

// Locker defines the methods to lock and unlock the database.
//
// Locking is an experimental feature and the underlying implementation
// may change in the future.
//
// The only database that currently supports locking is Postgres.
//
// Other databases will return ErrLockNotImplemented.
type Locker interface {
// IsLockingEnabled returns true if the database supports locking.
IsLockingEnabled() bool

// LockSession and UnlockSession are used to lock the database for the
// duration of a session.
//
// The session is defined as the duration of a single connection.
LockSession(ctx context.Context, conn *sql.Conn) error
UnlockSession(ctx context.Context, conn *sql.Conn) error

// LockTransaction is used to lock the database for the duration of a
// transaction.
LockTransaction(ctx context.Context, tx *sql.Tx) error
}

func (s *store) IsLockingEnabled() bool {
switch s.querier.(type) {
case *dialectquery.Postgres:
return true
default:
return false
}
}

func (s *store) LockSession(ctx context.Context, conn *sql.Conn) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
// TODO(mf): need to be VERY careful about the retry logic here to avoid
// stacking locks on top of each other. We need to make sure that we
// only retry if the lock is not already held.
//
// This retry is a bit pointless because if we can't get the lock, chances
// are another process is holding the lock and we will just spin here
// forever. We should probably just remove this retry.
//
// At best this might help with a transient network issue.
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
if _, err := conn.ExecContext(ctx, t.AdvisoryLockSession(), defaultLockID); err != nil {
return retry.RetryableError(err)
}
return nil
})
}
return ErrLockNotImplemented
}

func (s *store) UnlockSession(ctx context.Context, conn *sql.Conn) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
var unlocked bool
row := conn.QueryRowContext(ctx, t.AdvisoryUnlockSession(), defaultLockID)
if err := row.Scan(&unlocked); err != nil {
return retry.RetryableError(err)
}
if !unlocked {
// TODO(mf): provide the user with some documentation on how they can unlock
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added to the documentation as a gotcha, regardless of how edge case this is, when using session-level advisory locks.

// the session manually. Although this is probably not be an issue for 99.9%
// of users since pg_advisory_unlock_all() will release all session level
// advisory locks held by the current session. (This function is implicitly
// invoked at session end, even if the client disconnects ungracefully.)
//
// TODO(mf): - we may not want to bother checking the return value and just
// assume that the lock was released. This would simplify the code and
// remove the need for the unlocked bool.
//
// SELECT pid,granted,((classid::bigint<<32)|objid::bigint)AS goose_lock_id FROM pg_locks WHERE locktype='advisory';
//
// | pid | granted | goose_lock_id |
// |-----|---------|---------------------|
// | 191 | t | 5887940537704921958 |
//
// A more forceful way to unlock the session is to terminate the process:
// SELECT pg_terminate_backend(120);

return errors.New("failed to unlock session")
}
return nil
})
}
return ErrLockNotImplemented
}

func (s *store) LockTransaction(ctx context.Context, tx *sql.Tx) error {
switch t := s.querier.(type) {
case *dialectquery.Postgres:
return retry.Do(ctx, s.retry, func(ctx context.Context) error {
if _, err := tx.ExecContext(ctx, t.AdvisoryLockTransaction(), defaultLockID); err != nil {
return retry.RetryableError(err)
}
return nil
})
}
return ErrLockNotImplemented
}
14 changes: 13 additions & 1 deletion internal/dialectadapter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pressly/goose/v4/internal/dialectadapter/dialectquery"
"github.com/sethvargo/go-retry"
)

// EXPERIMENTAL: This is an experimental feature and may change in the future.
Expand Down Expand Up @@ -47,6 +48,10 @@ type Store interface {
// If there are no migrations, an empty slice is returned with no error.
ListMigrationsConn(ctx context.Context, conn *sql.Conn) ([]*ListMigrationsResult, error)
ListMigrations(ctx context.Context, db *sql.DB) ([]*ListMigrationsResult, error)

// Locker defines the methods for locking the database. Some databases
// do not support locking, in which case ErrLockNotImplemented is returned.
Locker
}

// NewStore returns a new Store for the given dialect.
Expand Down Expand Up @@ -77,7 +82,13 @@ func NewStore(d Dialect, table string) (Store, error) {
default:
return nil, fmt.Errorf("unknown querier dialect: %v", d)
}
return &store{querier: querier}, nil
r := retry.NewFibonacci(1 * time.Second)
r = retry.WithCappedDuration(5*time.Second, r)
r = retry.WithMaxDuration(30*time.Second, r)
return &store{
querier: querier,
retry: r,
}, nil
}

type GetMigrationResult struct {
Expand All @@ -92,6 +103,7 @@ type ListMigrationsResult struct {

type store struct {
querier dialectquery.Querier
retry retry.Backoff
}

var _ Store = (*store)(nil)
Expand Down
39 changes: 37 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type Options struct {
Filesystem fs.FS

// Commonly modified options.
Logger Logger
Verbose bool
Logger Logger
Verbose bool
LockMode LockMode

// Features.
AllowMissing bool
Expand Down Expand Up @@ -134,3 +135,37 @@ func (o Options) SetExcludeFilenames(filenames ...string) Options {
o.ExcludeFilenames = filenames
return o
}

type LockMode int

const (
LockModeNone LockMode = iota
LockModeAdvisorySession
LockModeAdvisoryTransaction
LockModeFile
)

func (l LockMode) String() string {
switch l {
case LockModeNone:
return "none"
case LockModeAdvisorySession:
return "advisory-session"
case LockModeAdvisoryTransaction:
return "advisory-transaction"
case LockModeFile:
return "file"
default:
return "unknown"
}
}

// SetLockMode returns a new Options value with LockMode set to the given value. LockMode is the
// locking mode to use when applying migrations. Locking is used to prevent multiple instances of
// goose from applying migrations concurrently.
//
// Default: LockModeNone
func (o Options) SetLockMode(m LockMode) Options {
o.LockMode = m
return o
}