Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication support for ClickHouse state table + Fixes ListMigration query for ClickHouse #520

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
GO_TEST_FLAGS ?= -race -count=1 -v -timeout=10m
GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)

.PHONY: dist
dist:
Expand All @@ -10,6 +12,11 @@ dist:
GOOS=windows GOARCH=amd64 go build -o ./bin/goose-windows64.exe ./cmd/goose
GOOS=windows GOARCH=386 go build -o ./bin/goose-windows386.exe ./cmd/goose

build:
@mkdir -p ./bin
@rm -f ./bin/*
go build -o ./bin/goose-$(GOOS)-$(GOARCH) ./cmd/goose

.PHONY: clean
clean:
@find . -type f -name '*.FAIL' -delete
Expand Down
7 changes: 7 additions & 0 deletions dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ func SetDialect(s string) error {
store, err = dialect.NewStore(d)
return err
}

func AttachOptions(options map[string]string) error {
if storeWithOptions, ok := store.(dialect.StoreOptions); ok {
return storeWithOptions.AttachOptions(options)
}
return nil
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
module github.com/pressly/goose/v3

go 1.18
go 1.20

require (
github.com/ClickHouse/clickhouse-go/v2 v2.9.1
github.com/go-sql-driver/mysql v1.7.1
github.com/google/go-cmp v0.5.9
github.com/jackc/pgx/v5 v5.3.1
github.com/microsoft/go-mssqldb v0.21.0
github.com/ory/dockertest/v3 v3.10.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
Expand Down
4 changes: 3 additions & 1 deletion internal/cfg/cfg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cfg

import "os"
import (
"os"
)

var (
GOOSEDRIVER = envOr("GOOSE_DRIVER", "")
Expand Down
49 changes: 44 additions & 5 deletions internal/dialect/dialectquery/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@ package dialectquery

import "fmt"

type Clickhouse struct{}
const (
paramOnCluster = "ON_CLUSTER"
paramClusterMacro = "CLUSTER_MACRO"
)

type clusterParameters struct {
OnCluster bool
ClusterMacro string
}

type Clickhouse struct {
Params clusterParameters
}

var _ Querier = (*Clickhouse)(nil)

Expand All @@ -11,10 +23,23 @@ func (c *Clickhouse) CreateTable(tableName string) string {
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = MergeTree()
ORDER BY (date)`
ENGINE = KeeperMap('/goose_version')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been coming back to this PR on/off, trying to think through the implementation from the perspective of a "general purpose" migration tool. On the surface, everything looks reasonable with the exception of changing the engine.

For context, another PR #530 came in requesting another engine.

Should the default engine be MergeTree but allow this to be configurable?

Maybe we could put our heads together on how to solve this so the tool can be useful for all or most ClickHouse users? cc @iamluc @arunk-s

I think PR #530 is on the right track to setting a custom engine, and afaics this would satisfy:

  1. default engine: MergeTree()
  2. ReplicatedMergeTree()
  3. KeeperMap()
  4. ... etc

Other questions, does the ORDER BY or PARTITION BY need to be configurable? Hopefully not so we can make this as generic as possible.

Last thought, leaving ENGINE = %s to the user may be a footgun and I wish we could do more upfront validation, but then we have to account for all the possible ClickHouse engine types and this seems like a lot of work to maintain.

Maybe there's a world where the user is allowed to supply a template for the table and as long as it contains:

  • version_id
  • is_applied (legacy field, deprecated)
  • tstamp

Not sure I like this idea, since goose should own the table it creates to ensure the identifiers match up across all implementations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
I had a quick look at the PR and it looks good to me (and could replace my own PR #530)

My only concern is about the engine. I'm not a ClickHouse expert and didn't know about the KeeperMap engine, but it seems it must be enabled (https://clickhouse.com/docs/en/engines/table-engines/special/keeper-map) which could be not possible with hosted services.
I suggest to add the engine as a new option.

Copy link
Author

@arunk-s arunk-s Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an option to toggle underlying table engine can definitely work!

Other questions, does the ORDER BY or PARTITION BY need to be configurable? Hopefully not so we can make this as generic as possible.

I think these settings are affected by the choice of Engine so yeah they'll need to be configurable if the custom engine is allowed.

I can understand the concern with hosted services to not have KeeperMap though I doubt that platforms like ClickHouse Cloud disallows creating tables with this table engine.

Last thought, leaving ENGINE = %s to the user may be a footgun and I wish we could do more upfront validation, but then we have to account for all the possible ClickHouse engine types and this seems like a lot of work to maintain.

Definitely, convention over configuration is generally desirable however in this particular case I'm afraid the abstractions will leak a bit in order to properly support the underlying database. But in any case we need to have a sensible default and only selected number of choices.

The library does quite well supporting different databases but that does come at a cost I'm afraid.

I'll be happy to lend a hand with ClickHouse specific topics though it can take a bit of time for me to catch up.

PRIMARY KEY version_id`

qCluster := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = KeeperMap('/goose_version_repl')
PRIMARY KEY version_id`

if c.Params.OnCluster {
return fmt.Sprintf(qCluster, tableName, c.Params.ClusterMacro)
}
return fmt.Sprintf(q, tableName)
}

Expand All @@ -34,6 +59,20 @@ func (c *Clickhouse) GetMigrationByVersion(tableName string) string {
}

func (c *Clickhouse) ListMigrations(tableName string) string {
q := `SELECT version_id, is_applied FROM %s ORDER BY version_id DESC`
q := `SELECT version_id, is_applied FROM %s ORDER BY tstamp DESC`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is fixing a bug? (I think you mentioned this in the description)

The core goose library relies on a non-user-defined order for when migrations get applied, this is typically a sequential id in all other databases. It seems here we were using version_id, but this is incorrect. In lieu of a sequential id, we should be using time..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. There is a no equivalent of an auto incrementing ID in clickhouse and it doesn't provide any way to get a globally incremented unique ID across its nodes.

So I relied on using timestamps, with just second precision, it was non-deterministic as migrations could have same timestamps.

With nanosecond precision, that chance is almost non-existent and the implementation further augments it with use of KeeeperMap Engine.

return fmt.Sprintf(q, tableName)
}

func (c *Clickhouse) AttachOptions(options map[string]string) error {
if val, ok := options[paramOnCluster]; ok {
if val == "true" {
clusterMacro, ok := options[paramClusterMacro]
if !ok {
clusterMacro = "{cluster}"
}
c.Params.ClusterMacro = clusterMacro
c.Params.OnCluster = true
}
}
return nil
}
116 changes: 116 additions & 0 deletions internal/dialect/dialectquery/clickhouse_dialect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package dialectquery

import (
"testing"

"github.com/google/go-cmp/cmp"
)

func TestClickhouseCreateTable(t *testing.T) {
t.Parallel()

type testData struct {
clickhouse *Clickhouse
result string
}

tests := []testData{
{
clickhouse: &Clickhouse{
Params: clusterParameters{
OnCluster: true,
ClusterMacro: "{cluster}",
},
},
result: `CREATE TABLE IF NOT EXISTS schema_migrations ON CLUSTER '{cluster}' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = KeeperMap('/goose_version_repl')
PRIMARY KEY version_id`,
},
{
clickhouse: &Clickhouse{
Params: clusterParameters{
OnCluster: true,
ClusterMacro: "dev-cluster",
},
},
result: `CREATE TABLE IF NOT EXISTS schema_migrations ON CLUSTER 'dev-cluster' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = KeeperMap('/goose_version_repl')
PRIMARY KEY version_id`,
},
}

for _, test := range tests {
out := test.clickhouse.CreateTable("schema_migrations")
if diff := cmp.Diff(test.result, out); diff != "" {
t.Errorf("clickhouse.CreateTable() mismatch (-want +got):\n%s", diff)
}
}
}

func TestClickhouseAttachOptions(t *testing.T) {
t.Parallel()

type testData struct {
options map[string]string
input *Clickhouse
err error
expected clusterParameters
}

tests := []testData{
{
options: map[string]string{
"ON_CLUSTER": "true",
},
input: &Clickhouse{},
err: nil,
expected: clusterParameters{
OnCluster: true,
ClusterMacro: "{cluster}",
},
},
{
options: map[string]string{
"ON_CLUSTER": "true",
"CLUSTER_MACRO": "dev-cluster",
},
input: &Clickhouse{},
err: nil,
expected: clusterParameters{
OnCluster: true,
ClusterMacro: "dev-cluster",
},
},
{
options: map[string]string{
"ON_CLUSTER": "false",
},
input: &Clickhouse{},
err: nil,
expected: clusterParameters{
OnCluster: false,
},
},
}

for _, test := range tests {
err := test.input.AttachOptions(test.options)
if err != test.err {
t.Errorf("AttachOptions mismatch expected error: %v, got: %v", test.err, err)
}
if diff := cmp.Diff(test.expected, test.input.Params); diff != "" {
t.Errorf("clickhouse.AttachOptions() mismatch (-want +got):\n%s", diff)
}
}

}
6 changes: 6 additions & 0 deletions internal/dialect/dialectquery/dialectquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ type Querier interface {
// The query should return the version_id and is_applied columns.
ListMigrations(tableName string) string
}

// QuerierOptions is an interface to provide specific options to a dialect.
// For ex: providing replication support to underlying migration table.
type QuerierOptions interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I initially added a "store" and "querier" the idea was to have the core goose library only interact with the store, and have no idea about the underlying queries.

The store abstracts the underlying querier which holds the raw string queries for the different database technologies.

Not sure if this made it easier/harder to evolve, but we should probably think about if there's a more elegant way to now thread options from the core library to those 2 internal layers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I struggled around this as well. I didn't find an easier way to hookup options all the way to the implementation specific detail. I tried to minimize the leaky abstraction(s) but I think it will have to exposed in one way.

If you have any suggestions, I can try those out.

AttachOptions(map[string]string) error
}
11 changes: 11 additions & 0 deletions internal/dialect/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type Store interface {
ListMigrations(ctx context.Context, db *sql.DB, tableName string) ([]*ListMigrationsResult, error)
}

type StoreOptions interface {
AttachOptions(map[string]string) error
}

// NewStore returns a new Store for the given dialect.
func NewStore(d Dialect) (Store, error) {
var querier dialectquery.Querier
Expand Down Expand Up @@ -156,3 +160,10 @@ func (s *store) ListMigrations(ctx context.Context, db *sql.DB, tableName string
}
return migrations, nil
}

func (s *store) AttachOptions(options map[string]string) error {
if querierWithOptions, ok := s.querier.(dialectquery.QuerierOptions); ok {
return querierWithOptions.AttachOptions(options)
}
return nil
}
21 changes: 18 additions & 3 deletions internal/testdb/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"database/sql"
"fmt"
"log"
"os"
"path"
"strconv"
"time"

Expand All @@ -16,15 +18,21 @@ import (
const (
// https://hub.docker.com/r/clickhouse/clickhouse-server/
CLICKHOUSE_IMAGE = "clickhouse/clickhouse-server"
CLICKHOUSE_VERSION = "22.9-alpine"
CLICKHOUSE_VERSION = "23.2.6.34-alpine"

CLICKHOUSE_DB = "clickdb"
CLICKHOUSE_USER = "clickuser"
CLICKHOUSE_PASSWORD = "password1"
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT = "1"
)

func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) {
var (
// DB_HOST is the hostname where ClickHouse is running
// localhost for local runs and docker for CI runs
DB_HOST string
)

func newClickHouse(confDir string, opts ...OptionsFunc) (*sql.DB, func(), error) {
option := &options{}
for _, f := range opts {
f(option)
Expand All @@ -34,6 +42,8 @@ func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) {
if err != nil {
return nil, nil, err
}
// Minimal additional configuration (config.d) to enable cluster mode
replconf := path.Join(confDir, "clickhouse-replicated.xml")
runOptions := &dockertest.RunOptions{
Repository: CLICKHOUSE_IMAGE,
Tag: CLICKHOUSE_VERSION,
Expand All @@ -45,6 +55,7 @@ func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) {
},
Labels: map[string]string{"goose_test": "1"},
PortBindings: make(map[docker.Port][]docker.PortBinding),
Mounts: []string{fmt.Sprintf("%s:/etc/clickhouse-server/config.d/testconf.xml", replconf)},
}
// Port 8123 is used for HTTP, but we're using the TCP protocol endpoint (port 9000).
// Ref: https://clickhouse.com/docs/en/interfaces/http/
Expand Down Expand Up @@ -74,8 +85,12 @@ func newClickHouse(opts ...OptionsFunc) (*sql.DB, func(), error) {
log.Printf("failed to purge resource: %v", err)
}
}
host := os.Getenv("DB_HOST")
if host == "" {
host = "localhost"
}
// Fetch port assigned to container
address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("9000/tcp"))
address := fmt.Sprintf("%s:%s", host, container.GetPort("9000/tcp"))

var db *sql.DB
// Exponential backoff-retry, because the application in the container
Expand Down
4 changes: 2 additions & 2 deletions internal/testdb/testdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package testdb
import "database/sql"

// NewClickHouse starts a ClickHouse docker container. Returns db connection and a docker cleanup function.
func NewClickHouse(options ...OptionsFunc) (db *sql.DB, cleanup func(), err error) {
return newClickHouse(options...)
func NewClickHouse(confDir string, options ...OptionsFunc) (db *sql.DB, cleanup func(), err error) {
return newClickHouse(confDir, options...)
}

// NewPostgres starts a PostgreSQL docker container. Returns db connection and a docker cleanup function.
Expand Down
1 change: 1 addition & 0 deletions migration_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const (
resetColor = "\033[00m"
)

// TODO: verbose is not thread safe and will report a race error if you use it via parallel tests
func verboseInfo(s string, args ...interface{}) {
if verbose {
if noColor {
Expand Down
Loading