Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(modules.clickhouse): Add zookeeper for clickhouse clusterization #1995

Merged
merged 11 commits into from
Feb 2, 2024
9 changes: 9 additions & 0 deletions docs/modules/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ initialization before starting the service.
[Init script content](../../modules/clickhouse/testdata/init-db.sh)
<!--/codeinclude-->

#### Zookeeper

Clusterized ClickHouse requires to start Zookeeper and pass link to it via `config.xml`.

<!--codeinclude-->
[Include zookeeper](../../modules/clickhouse/clickhouse_test.go) inside_block:withZookeeper
<!--/codeinclude-->


#### Custom configuration

If you need to set a custom configuration, the module provides the `WithConfigFile` option to pass the path to a custom configuration file in XML format.
Expand Down
54 changes: 54 additions & 0 deletions modules/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package clickhouse

import (
"bytes"
"context"
_ "embed"
"fmt"
"os"
"path/filepath"
"strings"
"text/template"

"github.com/docker/go-connections/nat"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

//go:embed mounts/zk_config.xml.tpl
var zookeeperConfigTpl string

const (
defaultUser = "default"
defaultDatabaseName = "clickhouse"
Expand Down Expand Up @@ -72,6 +79,53 @@ func (c *ClickHouseContainer) ConnectionString(ctx context.Context, args ...stri
return connectionString, nil
}

// ZookeeperOptions arguments for zookeeper in clickhouse
type ZookeeperOptions struct {
Host, Port string
}

// renderZookeeperConfig generate default zookeeper configuration for clickhouse
func renderZookeeperConfig(settings ZookeeperOptions) ([]byte, error) {
tpl, err := template.New("bootstrap.yaml").Parse(zookeeperConfigTpl)
if err != nil {
return nil, fmt.Errorf("failed to parse zookeeper config file template: %w", err)
}

var bootstrapConfig bytes.Buffer
if err := tpl.Execute(&bootstrapConfig, settings); err != nil {
return nil, fmt.Errorf("failed to render zookeeper bootstrap config template: %w", err)
}

return bootstrapConfig.Bytes(), nil
}

// WithZookeeper pass a config to connect clickhouse with zookeeper and make clickhouse as cluster
func WithZookeeper(host, port string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) {
f, err := os.CreateTemp("", "clickhouse-tc-config-")
if err != nil {
panic(err)
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}

defer f.Close()

// write data to the temporary file
data, err := renderZookeeperConfig(ZookeeperOptions{Host: host, Port: port})
if err != nil {
panic(err)
}
if _, err := f.Write(data); err != nil {
panic(err)
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}
cf := testcontainers.ContainerFile{
HostFilePath: f.Name(),
ContainerFilePath: "/etc/clickhouse-server/config.d/zookeeper_config.xml",
FileMode: 0o755,
}
req.Files = append(req.Files, cf)
}
}

// WithInitScripts sets the init scripts to be run when the container starts
func WithInitScripts(scripts ...string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) {
Expand Down
101 changes: 101 additions & 0 deletions modules/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/cenkalti/backoff/v4"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

const (
Expand Down Expand Up @@ -219,6 +221,105 @@ func TestClickHouseWithConfigFile(t *testing.T) {
}
}

func TestClickHouseWithZookeeper(t *testing.T) {
ctx := context.Background()

// withZookeeper {
zkPort := nat.Port("2181/tcp")

zkcontainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
ExposedPorts: []string{zkPort.Port()},
Image: "zookeeper:3.7",
WaitingFor: wait.ForListeningPort(zkPort),
},
Started: true,
})
if err != nil {
t.Fatal(err)
}

ipaddr, err := zkcontainer.ContainerIP(ctx)
if err != nil {
t.Fatal(err)
}

container, err := RunContainer(ctx,
WithUsername(user),
WithPassword(password),
WithDatabase(dbname),
WithZookeeper(ipaddr, zkPort.Port()),
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
t.Fatal(err)
}
// }

// Clean up the container after the test is complete
t.Cleanup(func() {
assert.NoError(t, container.Terminate(ctx))
assert.NoError(t, zkcontainer.Terminate(ctx))
})

connectionHost, err := container.ConnectionHost(ctx)
assert.NoError(t, err)

conn, err := ch.Open(&ch.Options{
Addr: []string{connectionHost},
Auth: ch.Auth{
Database: dbname,
Username: user,
Password: password, // --> password is not required
},
})
assert.NoError(t, err)
assert.NotNil(t, conn)
defer conn.Close()

// perform assertions
data, err := performReplicatedCRUD(conn)
assert.NoError(t, err)
assert.Len(t, data, 1)
}

func performReplicatedCRUD(conn driver.Conn) ([]Test, error) {
var (
err error
res []Test
)

err = backoff.Retry(func() error {
err = conn.Exec(context.Background(), "CREATE TABLE replicated_test_table (id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mdb.data_transfer_cp_cdc', '{replica}') PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return err
}

err = conn.Exec(context.Background(), "INSERT INTO replicated_test_table (id) VALUES (1);")
if err != nil {
return err
}

rows, err := conn.Query(context.Background(), "SELECT * FROM replicated_test_table;")
if err != nil {
return err
}

for rows.Next() {
var r Test

err := rows.Scan(&r.Id)
if err != nil {
return err
}

res = append(res, r)
}
return nil
}, backoff.NewExponentialBackOff())

return res, err
}

func performCRUD(conn driver.Conn) ([]Test, error) {
var (
err error
Expand Down
31 changes: 31 additions & 0 deletions modules/clickhouse/mounts/zk_config.xml.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0"?>
<clickhouse>
<zookeeper>
<node index="1">
<host>{{.Host}}</host>
<port>{{.Port}}</port>
</node>
</zookeeper>

<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
<macros>
<cluster>default</cluster>
<shard>shard</shard>
<replica>replica</replica>
</macros>

<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>

<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
laskoviymishka marked this conversation as resolved.
Show resolved Hide resolved
</clickhouse>