diff --git a/Makefile b/Makefile index a41d259a0..06b8296e2 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ GO_TEST_FLAGS ?= -race -count=1 -v -timeout=10m +GOOS ?= $(shell go env GOOS) +GOARCH ?= $(shell go env GOARCH) .PHONY: dist dist: @@ -10,6 +12,11 @@ dist: GOOS=windows GOARCH=amd64 go build -o ./bin/goose-windows64.exe ./cmd/goose GOOS=windows GOARCH=386 go build -o ./bin/goose-windows386.exe ./cmd/goose +build: + @mkdir -p ./bin + @rm -f ./bin/* + go build -o ./bin/goose-$(GOOS)-$(GOARCH) ./cmd/goose + .PHONY: clean clean: @find . -type f -name '*.FAIL' -delete diff --git a/dialect.go b/dialect.go index a14248002..4e53e61ab 100644 --- a/dialect.go +++ b/dialect.go @@ -39,3 +39,10 @@ func SetDialect(s string) error { store, err = dialect.NewStore(d) return err } + +func AttachOptions(options map[string]string) error { + if storeWithOptions, ok := store.(dialect.StoreOptions); ok { + return storeWithOptions.AttachOptions(options) + } + return nil +} diff --git a/go.mod b/go.mod index 795d2587b..e1dcf9003 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,11 @@ module github.com/pressly/goose/v3 -go 1.18 +go 1.20 require ( github.com/ClickHouse/clickhouse-go/v2 v2.9.1 github.com/go-sql-driver/mysql v1.7.1 + github.com/google/go-cmp v0.5.9 github.com/jackc/pgx/v5 v5.3.1 github.com/microsoft/go-mssqldb v0.21.0 github.com/ory/dockertest/v3 v3.10.0 diff --git a/go.sum b/go.sum index e027ff7d3..b44bf1b30 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,7 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= diff --git a/internal/cfg/cfg.go b/internal/cfg/cfg.go index aa9707633..d61ec358b 100644 --- a/internal/cfg/cfg.go +++ b/internal/cfg/cfg.go @@ -1,6 +1,8 @@ package cfg -import "os" +import ( + "os" +) var ( GOOSEDRIVER = envOr("GOOSE_DRIVER", "") diff --git a/internal/dialect/dialectquery/clickhouse.go b/internal/dialect/dialectquery/clickhouse.go index ca07f8684..a24c139db 100644 --- a/internal/dialect/dialectquery/clickhouse.go +++ b/internal/dialect/dialectquery/clickhouse.go @@ -2,7 +2,19 @@ package dialectquery import "fmt" -type Clickhouse struct{} +const ( + paramOnCluster = "ON_CLUSTER" + paramClusterMacro = "CLUSTER_MACRO" +) + +type clusterParameters struct { + OnCluster bool + ClusterMacro string +} + +type Clickhouse struct { + Params clusterParameters +} var _ Querier = (*Clickhouse)(nil) @@ -11,10 +23,23 @@ func (c *Clickhouse) CreateTable(tableName string) string { version_id Int64, is_applied UInt8, date Date default now(), - tstamp DateTime default now() + tstamp DateTime64(9, 'UTC') default now64(9, 'UTC') ) - ENGINE = MergeTree() - ORDER BY (date)` + ENGINE = KeeperMap('/goose_version') + PRIMARY KEY version_id` + + qCluster := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' ( + version_id Int64, + is_applied UInt8, + date Date default now(), + tstamp DateTime64(9, 'UTC') default now64(9, 'UTC') + ) + ENGINE = KeeperMap('/goose_version_repl') + PRIMARY KEY version_id` + + if c.Params.OnCluster { + return fmt.Sprintf(qCluster, tableName, c.Params.ClusterMacro) + } return fmt.Sprintf(q, tableName) } @@ -34,6 +59,20 @@ func (c *Clickhouse) GetMigrationByVersion(tableName string) string { } func (c *Clickhouse) ListMigrations(tableName string) string { - q := `SELECT version_id, is_applied FROM %s ORDER BY version_id DESC` + q := `SELECT version_id, is_applied FROM %s ORDER BY tstamp DESC` return fmt.Sprintf(q, tableName) } + +func (c *Clickhouse) AttachOptions(options map[string]string) error { + if val, ok := options[paramOnCluster]; ok { + if val == "true" { + clusterMacro, ok := options[paramClusterMacro] + if !ok { + clusterMacro = "{cluster}" + } + c.Params.ClusterMacro = clusterMacro + c.Params.OnCluster = true + } + } + return nil +} diff --git a/internal/dialect/dialectquery/clickhouse_dialect_test.go b/internal/dialect/dialectquery/clickhouse_dialect_test.go new file mode 100644 index 000000000..cf0b89113 --- /dev/null +++ b/internal/dialect/dialectquery/clickhouse_dialect_test.go @@ -0,0 +1,116 @@ +package dialectquery + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestClickhouseCreateTable(t *testing.T) { + t.Parallel() + + type testData struct { + clickhouse *Clickhouse + result string + } + + tests := []testData{ + { + clickhouse: &Clickhouse{ + Params: clusterParameters{ + OnCluster: true, + ClusterMacro: "{cluster}", + }, + }, + result: `CREATE TABLE IF NOT EXISTS schema_migrations ON CLUSTER '{cluster}' ( + version_id Int64, + is_applied UInt8, + date Date default now(), + tstamp DateTime64(9, 'UTC') default now64(9, 'UTC') + ) + ENGINE = KeeperMap('/goose_version_repl') + PRIMARY KEY version_id`, + }, + { + clickhouse: &Clickhouse{ + Params: clusterParameters{ + OnCluster: true, + ClusterMacro: "dev-cluster", + }, + }, + result: `CREATE TABLE IF NOT EXISTS schema_migrations ON CLUSTER 'dev-cluster' ( + version_id Int64, + is_applied UInt8, + date Date default now(), + tstamp DateTime64(9, 'UTC') default now64(9, 'UTC') + ) + ENGINE = KeeperMap('/goose_version_repl') + PRIMARY KEY version_id`, + }, + } + + for _, test := range tests { + out := test.clickhouse.CreateTable("schema_migrations") + if diff := cmp.Diff(test.result, out); diff != "" { + t.Errorf("clickhouse.CreateTable() mismatch (-want +got):\n%s", diff) + } + } +} + +func TestClickhouseAttachOptions(t *testing.T) { + t.Parallel() + + type testData struct { + options map[string]string + input *Clickhouse + err error + expected clusterParameters + } + + tests := []testData{ + { + options: map[string]string{ + "ON_CLUSTER": "true", + }, + input: &Clickhouse{}, + err: nil, + expected: clusterParameters{ + OnCluster: true, + ClusterMacro: "{cluster}", + }, + }, + { + options: map[string]string{ + "ON_CLUSTER": "true", + "CLUSTER_MACRO": "dev-cluster", + }, + input: &Clickhouse{}, + err: nil, + expected: clusterParameters{ + OnCluster: true, + ClusterMacro: "dev-cluster", + }, + }, + { + options: map[string]string{ + "ON_CLUSTER": "false", + }, + input: &Clickhouse{}, + err: nil, + expected: clusterParameters{ + OnCluster: false, + }, + }, + } + + for _, test := range tests { + err := test.input.AttachOptions(test.options) + if err != test.err { + t.Errorf("AttachOptions mismatch expected error: %v, got: %v", test.err, err) + } + if diff := cmp.Diff(test.expected, test.input.Params); diff != "" { + t.Errorf("clickhouse.AttachOptions() mismatch (-want +got):\n%s", diff) + } + } + +} diff --git a/internal/dialect/dialectquery/dialectquery.go b/internal/dialect/dialectquery/dialectquery.go index 482771aa1..d3d29eab1 100644 --- a/internal/dialect/dialectquery/dialectquery.go +++ b/internal/dialect/dialectquery/dialectquery.go @@ -26,3 +26,9 @@ type Querier interface { // The query should return the version_id and is_applied columns. ListMigrations(tableName string) string } + +// QuerierOptions is an interface to provide specific options to a dialect. +// For ex: providing replication support to underlying migration table. +type QuerierOptions interface { + AttachOptions(map[string]string) error +} diff --git a/internal/dialect/store.go b/internal/dialect/store.go index b51cdcefe..0c6b12fbd 100644 --- a/internal/dialect/store.go +++ b/internal/dialect/store.go @@ -45,6 +45,10 @@ type Store interface { ListMigrations(ctx context.Context, db *sql.DB, tableName string) ([]*ListMigrationsResult, error) } +type StoreOptions interface { + AttachOptions(map[string]string) error +} + // NewStore returns a new Store for the given dialect. func NewStore(d Dialect) (Store, error) { var querier dialectquery.Querier @@ -156,3 +160,10 @@ func (s *store) ListMigrations(ctx context.Context, db *sql.DB, tableName string } return migrations, nil } + +func (s *store) AttachOptions(options map[string]string) error { + if querierWithOptions, ok := s.querier.(dialectquery.QuerierOptions); ok { + return querierWithOptions.AttachOptions(options) + } + return nil +} diff --git a/internal/testdb/clickhouse.go b/internal/testdb/clickhouse.go index f0ee13a12..e18cf18ea 100644 --- a/internal/testdb/clickhouse.go +++ b/internal/testdb/clickhouse.go @@ -5,6 +5,8 @@ import ( "database/sql" "fmt" "log" + "os" + "path" "strconv" "time" @@ -16,7 +18,7 @@ import ( const ( // https://hub.docker.com/r/clickhouse/clickhouse-server/ CLICKHOUSE_IMAGE = "clickhouse/clickhouse-server" - CLICKHOUSE_VERSION = "22.9-alpine" + CLICKHOUSE_VERSION = "23.2.6.34-alpine" CLICKHOUSE_DB = "clickdb" CLICKHOUSE_USER = "clickuser" @@ -24,7 +26,13 @@ const ( CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT = "1" ) -func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) { +var ( + // DB_HOST is the hostname where ClickHouse is running + // localhost for local runs and docker for CI runs + DB_HOST string +) + +func newClickHouse(confDir string, opts ...OptionsFunc) (*sql.DB, func(), error) { option := &options{} for _, f := range opts { f(option) @@ -34,6 +42,8 @@ func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) { if err != nil { return nil, nil, err } + // Minimal additional configuration (config.d) to enable cluster mode + replconf := path.Join(confDir, "clickhouse-replicated.xml") runOptions := &dockertest.RunOptions{ Repository: CLICKHOUSE_IMAGE, Tag: CLICKHOUSE_VERSION, @@ -45,6 +55,7 @@ func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) { }, Labels: map[string]string{"goose_test": "1"}, PortBindings: make(map[docker.Port][]docker.PortBinding), + Mounts: []string{fmt.Sprintf("%s:/etc/clickhouse-server/config.d/testconf.xml", replconf)}, } // Port 8123 is used for HTTP, but we're using the TCP protocol endpoint (port 9000). // Ref: https://clickhouse.com/docs/en/interfaces/http/ @@ -74,8 +85,12 @@ func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) { log.Printf("failed to purge resource: %v", err) } } + host := os.Getenv("DB_HOST") + if host == "" { + host = "localhost" + } // Fetch port assigned to container - address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("9000/tcp")) + address := fmt.Sprintf("%s:%s", host, container.GetPort("9000/tcp")) var db *sql.DB // Exponential backoff-retry, because the application in the container diff --git a/internal/testdb/testdb.go b/internal/testdb/testdb.go index ffa831eab..49c417b5a 100644 --- a/internal/testdb/testdb.go +++ b/internal/testdb/testdb.go @@ -3,8 +3,8 @@ package testdb import "database/sql" // NewClickHouse starts a ClickHouse docker container. Returns db connection and a docker cleanup function. -func NewClickHouse(options ...OptionsFunc) (db *sql.DB, cleanup func(), err error) { - return newClickHouse(options...) +func NewClickHouse(confDir string, options ...OptionsFunc) (db *sql.DB, cleanup func(), err error) { + return newClickHouse(confDir, options...) } // NewPostgres starts a PostgreSQL docker container. Returns db connection and a docker cleanup function. diff --git a/migration_sql.go b/migration_sql.go index f74b70d75..6254a57b8 100644 --- a/migration_sql.go +++ b/migration_sql.go @@ -94,6 +94,7 @@ const ( resetColor = "\033[00m" ) +// TODO: verbose is not thread safe and will report a race error if you use it via parallel tests func verboseInfo(s string, args ...interface{}) { if verbose { if noColor { diff --git a/tests/clickhouse/clickhouse-replicated.xml b/tests/clickhouse/clickhouse-replicated.xml new file mode 100644 index 000000000..e69abd3e0 --- /dev/null +++ b/tests/clickhouse/clickhouse-replicated.xml @@ -0,0 +1,48 @@ + + + + cluster + 0 + cluster + 0 + cluster-0-0 + + + /keeper_map_tables + + + + true + + localhost + 9000 + + + + + + + 2181 + 0 + /var/log/clickhouse-server/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 0 + localhost + 9444 + + + + + + + + localhost + 2181 + + + + /clickhouse/cluster/task_queue/ddl + + \ No newline at end of file diff --git a/tests/clickhouse/clickhouse_test.go b/tests/clickhouse/clickhouse_test.go index 2ccfbe947..49b1d5df1 100644 --- a/tests/clickhouse/clickhouse_test.go +++ b/tests/clickhouse/clickhouse_test.go @@ -23,10 +23,11 @@ func TestClickUpDownAll(t *testing.T) { t.Parallel() migrationDir := filepath.Join("testdata", "migrations") - db, cleanup, err := testdb.NewClickHouse() + workingDir, err := os.Getwd() + check.NoError(t, err) + db, cleanup, err := testdb.NewClickHouse(workingDir) check.NoError(t, err) t.Cleanup(cleanup) - /* This test applies all up migrations, asserts we have all the entries in the versions table, applies all down migration and asserts we have zero @@ -73,7 +74,9 @@ func TestClickHouseFirstThree(t *testing.T) { t.Parallel() migrationDir := filepath.Join("testdata", "migrations") - db, cleanup, err := testdb.NewClickHouse() + workingDir, err := os.Getwd() + check.NoError(t, err) + db, cleanup, err := testdb.NewClickHouse(workingDir) check.NoError(t, err) t.Cleanup(cleanup) @@ -126,6 +129,45 @@ func TestClickHouseFirstThree(t *testing.T) { } } +func TestClickHouseOnCluster(t *testing.T) { + err := goose.AttachOptions(map[string]string{ + "ON_CLUSTER": "true", + }) + check.NoError(t, err) + + workingDir, err := os.Getwd() + check.NoError(t, err) + db, cleanup, err := testdb.NewClickHouse(workingDir) + check.NoError(t, err) + t.Cleanup(cleanup) + + _, err = goose.GetDBVersion(db) + check.NoError(t, err) + + migrationDir := filepath.Join("testdata", "migrations") + // Collect migrations so we don't have to hard-code the currentVersion + // in an assertion later in the test. + migrations, err := goose.CollectMigrations(migrationDir, 0, goose.MaxVersion) + check.NoError(t, err) + + currentVersion, err := goose.GetDBVersion(db) + check.NoError(t, err) + check.Number(t, currentVersion, 0) + + err = goose.Up(db, migrationDir) + check.NoError(t, err) + currentVersion, err = goose.GetDBVersion(db) + check.NoError(t, err) + check.Number(t, currentVersion, len(migrations)) + + err = goose.DownTo(db, migrationDir, 0) + check.NoError(t, err) + + currentVersion, err = goose.GetDBVersion(db) + check.NoError(t, err) + check.Number(t, currentVersion, 0) +} + func TestRemoteImportMigration(t *testing.T) { t.Parallel() // TODO(mf): use TestMain and create a proper "long" or "remote" flag. @@ -141,7 +183,9 @@ func TestRemoteImportMigration(t *testing.T) { // and craft a long INSERT statement. migrationDir := filepath.Join("testdata", "migrations-remote") - db, cleanup, err := testdb.NewClickHouse() + workingDir, err := os.Getwd() + check.NoError(t, err) + db, cleanup, err := testdb.NewClickHouse(workingDir) check.NoError(t, err) t.Cleanup(cleanup)