Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication support for ClickHouse state table + Fixes ListMigration query for ClickHouse #520

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
GO_TEST_FLAGS ?= -race -count=1 -v -timeout=10m
GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)

.PHONY: dist
dist:
Expand All @@ -10,6 +12,11 @@ dist:
GOOS=windows GOARCH=amd64 go build -o ./bin/goose-windows64.exe ./cmd/goose
GOOS=windows GOARCH=386 go build -o ./bin/goose-windows386.exe ./cmd/goose

build:
@mkdir -p ./bin
@rm -f ./bin/*
go build -o ./bin/goose-$(GOOS)-$(GOARCH) ./cmd/goose

.PHONY: clean
clean:
@find . -type f -name '*.FAIL' -delete
Expand Down
7 changes: 7 additions & 0 deletions dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ func SetDialect(s string) error {
store, err = dialect.NewStore(d)
return err
}

func AttachOptions(options map[string]string) error {
if storeWithOptions, ok := store.(dialect.StoreOptions); ok {
return storeWithOptions.AttachOptions(options)
}
return nil
}
19 changes: 18 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
module github.com/pressly/goose/v3

go 1.18
go 1.20

require (
github.com/ClickHouse/clickhouse-go/v2 v2.9.1
github.com/go-sql-driver/mysql v1.7.1
github.com/google/go-cmp v0.5.9
github.com/jackc/pgx/v5 v5.3.1
github.com/microsoft/go-mssqldb v0.21.0
github.com/ory/dockertest/v3 v3.10.0
github.com/stretchr/testify v1.8.2
arunk-s marked this conversation as resolved.
Show resolved Hide resolved
github.com/testcontainers/testcontainers-go v0.19.0
arunk-s marked this conversation as resolved.
Show resolved Hide resolved
github.com/vertica/vertica-sql-go v1.3.2
github.com/ziutek/mymysql v1.5.4
modernc.org/sqlite v1.22.1
Expand All @@ -20,8 +23,12 @@ require (
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/containerd/containerd v1.6.19 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/cli v23.0.6+incompatible // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v23.0.6+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand All @@ -33,6 +40,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.15 // indirect
Expand All @@ -42,15 +50,20 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/paulmach/orb v0.9.2 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand All @@ -63,9 +76,13 @@ require (
go.opentelemetry.io/otel/trace v1.15.1 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
howett.net/plist v1.0.0 // indirect
Expand Down
127 changes: 127 additions & 0 deletions go.sum

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion internal/cfg/cfg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cfg

import "os"
import (
"os"
)

var (
GOOSEDRIVER = envOr("GOOSE_DRIVER", "")
Expand Down
49 changes: 44 additions & 5 deletions internal/dialect/dialectquery/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@ package dialectquery

import "fmt"

type Clickhouse struct{}
const (
paramOnCluster = "ON_CLUSTER"
paramClusterMacro = "CLUSTER_MACRO"
)

type clusterParameters struct {
OnCluster bool
ClusterMacro string
}

type Clickhouse struct {
Params clusterParameters
}

var _ Querier = (*Clickhouse)(nil)

Expand All @@ -11,10 +23,23 @@ func (c *Clickhouse) CreateTable(tableName string) string {
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = MergeTree()
ORDER BY (date)`
ENGINE = KeeperMap('/goose_version')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been coming back to this PR on/off, trying to think through the implementation from the perspective of a "general purpose" migration tool. On the surface, everything looks reasonable with the exception of changing the engine.

For context, another PR #530 came in requesting another engine.

Should the default engine be MergeTree but allow this to be configurable?

Maybe we could put our heads together on how to solve this so the tool can be useful for all or most ClickHouse users? cc @iamluc @arunk-s

I think PR #530 is on the right track to setting a custom engine, and afaics this would satisfy:

  1. default engine: MergeTree()
  2. ReplicatedMergeTree()
  3. KeeperMap()
  4. ... etc

Other questions, does the ORDER BY or PARTITION BY need to be configurable? Hopefully not so we can make this as generic as possible.

Last thought, leaving ENGINE = %s to the user may be a footgun and I wish we could do more upfront validation, but then we have to account for all the possible ClickHouse engine types and this seems like a lot of work to maintain.

Maybe there's a world where the user is allowed to supply a template for the table and as long as it contains:

  • version_id
  • is_applied (legacy field, deprecated)
  • tstamp

Not sure I like this idea, since goose should own the table it creates to ensure the identifiers match up across all implementations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
I had a quick look at the PR and it looks good to me (and could replace my own PR #530)

My only concern is about the engine. I'm not a ClickHouse expert and didn't know about the KeeperMap engine, but it seems it must be enabled (https://clickhouse.com/docs/en/engines/table-engines/special/keeper-map) which could be not possible with hosted services.
I suggest to add the engine as a new option.

Copy link
Author

@arunk-s arunk-s Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an option to toggle underlying table engine can definitely work!

Other questions, does the ORDER BY or PARTITION BY need to be configurable? Hopefully not so we can make this as generic as possible.

I think these settings are affected by the choice of Engine so yeah they'll need to be configurable if the custom engine is allowed.

I can understand the concern with hosted services to not have KeeperMap though I doubt that platforms like ClickHouse Cloud disallows creating tables with this table engine.

Last thought, leaving ENGINE = %s to the user may be a footgun and I wish we could do more upfront validation, but then we have to account for all the possible ClickHouse engine types and this seems like a lot of work to maintain.

Definitely, convention over configuration is generally desirable however in this particular case I'm afraid the abstractions will leak a bit in order to properly support the underlying database. But in any case we need to have a sensible default and only selected number of choices.

The library does quite well supporting different databases but that does come at a cost I'm afraid.

I'll be happy to lend a hand with ClickHouse specific topics though it can take a bit of time for me to catch up.

PRIMARY KEY version_id`

qCluster := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = KeeperMap('/goose_version_repl')
PRIMARY KEY version_id`

if c.Params.OnCluster {
return fmt.Sprintf(qCluster, tableName, c.Params.ClusterMacro)
}
return fmt.Sprintf(q, tableName)
}

Expand All @@ -34,6 +59,20 @@ func (c *Clickhouse) GetMigrationByVersion(tableName string) string {
}

func (c *Clickhouse) ListMigrations(tableName string) string {
q := `SELECT version_id, is_applied FROM %s ORDER BY version_id DESC`
q := `SELECT version_id, is_applied FROM %s ORDER BY tstamp DESC`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is fixing a bug? (I think you mentioned this in the description)

The core goose library relies on a non-user-defined order for when migrations get applied, this is typically a sequential id in all other databases. It seems here we were using version_id, but this is incorrect. In lieu of a sequential id, we should be using time..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. There is a no equivalent of an auto incrementing ID in clickhouse and it doesn't provide any way to get a globally incremented unique ID across its nodes.

So I relied on using timestamps, with just second precision, it was non-deterministic as migrations could have same timestamps.

With nanosecond precision, that chance is almost non-existent and the implementation further augments it with use of KeeeperMap Engine.

return fmt.Sprintf(q, tableName)
}

func (c *Clickhouse) AttachOptions(options map[string]string) error {
if val, ok := options[paramOnCluster]; ok {
if val == "true" {
clusterMacro, ok := options[paramClusterMacro]
if !ok {
clusterMacro = "{cluster}"
}
c.Params.ClusterMacro = clusterMacro
c.Params.OnCluster = true
}
}
return nil
}
116 changes: 116 additions & 0 deletions internal/dialect/dialectquery/clickhouse_dialect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package dialectquery

import (
"testing"

"github.com/google/go-cmp/cmp"
)

func TestClickhouseCreateTable(t *testing.T) {
t.Parallel()

type testData struct {
clickhouse *Clickhouse
result string
}

tests := []testData{
{
clickhouse: &Clickhouse{
Params: clusterParameters{
OnCluster: true,
ClusterMacro: "{cluster}",
},
},
result: `CREATE TABLE IF NOT EXISTS schema_migrations ON CLUSTER '{cluster}' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = KeeperMap('/goose_version_repl')
PRIMARY KEY version_id`,
},
{
clickhouse: &Clickhouse{
Params: clusterParameters{
OnCluster: true,
ClusterMacro: "dev-cluster",
},
},
result: `CREATE TABLE IF NOT EXISTS schema_migrations ON CLUSTER 'dev-cluster' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime64(9, 'UTC') default now64(9, 'UTC')
)
ENGINE = KeeperMap('/goose_version_repl')
PRIMARY KEY version_id`,
},
}

for _, test := range tests {
out := test.clickhouse.CreateTable("schema_migrations")
if diff := cmp.Diff(test.result, out); diff != "" {
t.Errorf("clickhouse.CreateTable() mismatch (-want +got):\n%s", diff)
}
}
}

func TestClickhouseAttachOptions(t *testing.T) {
t.Parallel()

type testData struct {
options map[string]string
input *Clickhouse
err error
expected clusterParameters
}

tests := []testData{
{
options: map[string]string{
"ON_CLUSTER": "true",
},
input: &Clickhouse{},
err: nil,
expected: clusterParameters{
OnCluster: true,
ClusterMacro: "{cluster}",
},
},
{
options: map[string]string{
"ON_CLUSTER": "true",
"CLUSTER_MACRO": "dev-cluster",
},
input: &Clickhouse{},
err: nil,
expected: clusterParameters{
OnCluster: true,
ClusterMacro: "dev-cluster",
},
},
{
options: map[string]string{
"ON_CLUSTER": "false",
},
input: &Clickhouse{},
err: nil,
expected: clusterParameters{
OnCluster: false,
},
},
}

for _, test := range tests {
err := test.input.AttachOptions(test.options)
if err != test.err {
t.Errorf("AttachOptions mismatch expected error: %v, got: %v", test.err, err)
}
if diff := cmp.Diff(test.expected, test.input.Params); diff != "" {
t.Errorf("clickhouse.AttachOptions() mismatch (-want +got):\n%s", diff)
}
}

}
6 changes: 6 additions & 0 deletions internal/dialect/dialectquery/dialectquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ type Querier interface {
// The query should return the version_id and is_applied columns.
ListMigrations(tableName string) string
}

// QuerierOptions is an interface to provide specific options to a dialect.
// For ex: providing replication support to underlying migration table.
type QuerierOptions interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I initially added a "store" and "querier" the idea was to have the core goose library only interact with the store, and have no idea about the underlying queries.

The store abstracts the underlying querier which holds the raw string queries for the different database technologies.

Not sure if this made it easier/harder to evolve, but we should probably think about if there's a more elegant way to now thread options from the core library to those 2 internal layers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I struggled around this as well. I didn't find an easier way to hookup options all the way to the implementation specific detail. I tried to minimize the leaky abstraction(s) but I think it will have to exposed in one way.

If you have any suggestions, I can try those out.

AttachOptions(map[string]string) error
}
11 changes: 11 additions & 0 deletions internal/dialect/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type Store interface {
ListMigrations(ctx context.Context, db *sql.DB, tableName string) ([]*ListMigrationsResult, error)
}

type StoreOptions interface {
AttachOptions(map[string]string) error
}

// NewStore returns a new Store for the given dialect.
func NewStore(d Dialect) (Store, error) {
var querier dialectquery.Querier
Expand Down Expand Up @@ -156,3 +160,10 @@ func (s *store) ListMigrations(ctx context.Context, db *sql.DB, tableName string
}
return migrations, nil
}

func (s *store) AttachOptions(options map[string]string) error {
if querierWithOptions, ok := s.querier.(dialectquery.QuerierOptions); ok {
return querierWithOptions.AttachOptions(options)
}
return nil
}
Loading