Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await schema agreement during migrations #133

Merged
merged 2 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/scylladb/gocqlx

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a
github.com/gocql/gocql v0.0.0-20200131111108-92af2e088537
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-cmp v0.2.0
github.com/scylladb/go-reflectx v1.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a h1:f/7VP2EmdQagG92I/75YnM3ZIeCYa61BT2kZoJFptHM=
github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20200131111108-92af2e088537 h1:NaMut1fdw76YYX/TPinSAbai4DShF5tPort3bHpET6g=
github.com/gocql/gocql v0.0.0-20200131111108-92af2e088537/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
Expand Down
36 changes: 36 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ import (
"github.com/scylladb/gocqlx/qb"
)

// DefaultAwaitSchemaAgreement controls whether checking for cluster schema agreement
// is disabled or if it is checked before each file or statement is applied.
// The default is not checking before each file or statement but only once after every
// migration has been run.
var DefaultAwaitSchemaAgreement = AwaitSchemaAgreementDisabled

type awaitSchemaAgreement int

// Options for checking schema agreement.
const (
AwaitSchemaAgreementDisabled awaitSchemaAgreement = iota
AwaitSchemaAgreementBeforeEachFile
AwaitSchemaAgreementBeforeEachStatement
)

// ShouldAwait decides whether to await schema agreement for the configured DefaultAwaitSchemaAgreement option above.
func (as awaitSchemaAgreement) ShouldAwait(stage awaitSchemaAgreement) bool {
return as == stage
}

const (
infoSchema = `CREATE TABLE IF NOT EXISTS gocqlx_migrate (
name text,
Expand Down Expand Up @@ -117,6 +137,10 @@ func Migrate(ctx context.Context, session *gocql.Session, dir string) error {
}
}

if err = session.AwaitSchemaAgreement(ctx); err != nil {
return fmt.Errorf("awaiting schema agreement failed: %s", err)
}

return nil
}

Expand Down Expand Up @@ -149,6 +173,12 @@ func applyMigration(ctx context.Context, session *gocql.Session, path string, do
iq := gocqlx.Query(session.Query(stmt).WithContext(ctx), names)
defer iq.Release()

if DefaultAwaitSchemaAgreement.ShouldAwait(AwaitSchemaAgreementBeforeEachFile) {
if err = session.AwaitSchemaAgreement(ctx); err != nil {
return fmt.Errorf("awaiting schema agreement failed: %s", err)
}
}

i := 0
r := bytes.NewBuffer(b)
for {
Expand Down Expand Up @@ -176,6 +206,12 @@ func applyMigration(ctx context.Context, session *gocql.Session, path string, do
}
}

if DefaultAwaitSchemaAgreement.ShouldAwait(AwaitSchemaAgreementBeforeEachStatement) {
if err = session.AwaitSchemaAgreement(ctx); err != nil {
return fmt.Errorf("awaiting schema agreement failed: %s", err)
}
}

// execute
q := gocqlx.Query(session.Query(stmt).RetryPolicy(nil).WithContext(ctx), nil)
if err := q.ExecRelease(); err != nil {
Expand Down