Skip to content

Commit

Permalink
fix: clickhouse zookeeper table metadata (#4121)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Nov 17, 2023
1 parent 6c28e25 commit 41e060a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 26 deletions.
27 changes: 11 additions & 16 deletions warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,22 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/logfield"

"github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/cenkalti/backoff/v4"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/ClickHouse/clickhouse-go"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"

sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -793,7 +788,7 @@ func (ch *Clickhouse) createUsersTable(ctx context.Context, name string, columns
if len(strings.TrimSpace(cluster)) > 0 {
clusterClause = fmt.Sprintf(`ON CLUSTER %q`, cluster)
engine = fmt.Sprintf(`%s%s`, "Replicated", engine)
engineOptions = `'/clickhouse/{cluster}/tables/{database}/{table}', '{replica}'`
engineOptions = fmt.Sprintf(`'/clickhouse/{cluster}/tables/%s/{database}/{table}', '{replica}'`, uuid.New().String())
}
sqlStatement := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %q.%q %s ( %v ) ENGINE = %s(%s) ORDER BY %s PARTITION BY toDate(%s)`, ch.Namespace, name, clusterClause, ch.ColumnsWithDataTypes(name, columns, notNullableColumns), engine, engineOptions, getSortKeyTuple(sortKeyFields), partitionField)
ch.logger.Infof("CH: Creating table in clickhouse for ch:%s : %v", ch.Warehouse.Destination.ID, sqlStatement)
Expand Down Expand Up @@ -835,7 +830,7 @@ func (ch *Clickhouse) CreateTable(ctx context.Context, tableName string, columns
if len(strings.TrimSpace(cluster)) > 0 {
clusterClause = fmt.Sprintf(`ON CLUSTER %q`, cluster)
engine = fmt.Sprintf(`%s%s`, "Replicated", engine)
engineOptions = `'/clickhouse/{cluster}/tables/{database}/{table}', '{replica}'`
engineOptions = fmt.Sprintf(`'/clickhouse/{cluster}/tables/%s/{database}/{table}', '{replica}'`, uuid.New().String())
}
var orderByClause string
if len(sortKeyFields) > 0 {
Expand Down
97 changes: 87 additions & 10 deletions warehouse/integrations/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats/memstats"

clickhousestd "github.com/ClickHouse/clickhouse-go"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/compose-test/compose"
"github.com/rudderlabs/compose-test/testcompose"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand All @@ -38,9 +40,9 @@ import (
)

func TestIntegration(t *testing.T) {
if os.Getenv("SLOW") != "1" {
t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.")
}
//if os.Getenv("SLOW") != "1" {
// t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.")
//}

c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.clickhouse.yml", "testdata/docker-compose.clickhouse-cluster.yml", "../testdata/docker-compose.jobsdb.yml", "../testdata/docker-compose.minio.yml"}))
c.Start(context.Background())
Expand Down Expand Up @@ -160,7 +162,7 @@ func TestIntegration(t *testing.T) {
destinationID string
warehouseEvents testhelper.EventsCountMap
warehouseModifiedEvents testhelper.EventsCountMap
clusterSetup func(t testing.TB)
clusterSetup func(t *testing.T)
db *sql.DB
stagingFilePrefix string
}{
Expand Down Expand Up @@ -188,9 +190,9 @@ func TestIntegration(t *testing.T) {
"aliases": 8,
"groups": 8,
},
clusterSetup: func(t testing.TB) {
clusterSetup: func(t *testing.T) {
t.Helper()
initializeClickhouseClusterMode(t, dbs[1:], tables)
initializeClickhouseClusterMode(t, dbs[1:], tables, clusterPort1)
},
stagingFilePrefix: "testdata/upload-cluster-job",
},
Expand Down Expand Up @@ -1018,7 +1020,9 @@ func connectClickhouseDB(ctx context.Context, t testing.TB, dsn string) *sql.DB
defer cancel()

require.Eventually(t, func() bool {
return db.PingContext(ctx) == nil
err := db.PingContext(ctx)
t.Log(err)
return err == nil
}, time.Minute, time.Second)

err = db.PingContext(ctx)
Expand All @@ -1027,7 +1031,7 @@ func connectClickhouseDB(ctx context.Context, t testing.TB, dsn string) *sql.DB
return db
}

func initializeClickhouseClusterMode(t testing.TB, clusterDBs []*sql.DB, tables []string) {
func initializeClickhouseClusterMode(t *testing.T, clusterDBs []*sql.DB, tables []string, clusterPost int) {
t.Helper()

type ColumnInfoT struct {
Expand Down Expand Up @@ -1161,6 +1165,79 @@ func initializeClickhouseClusterMode(t testing.TB, clusterDBs []*sql.DB, tables
}))
}

t.Run("Create Drop Create", func(t *testing.T) {
clusterDB := connectClickhouseDB(context.Background(), t, fmt.Sprintf("tcp://%s:%d?compress=false&database=%s&password=%s&secure=false&skip_verify=true&username=%s",
"localhost", clusterPost, "rudderdb", "rudder-password", "rudder",
))

t.Run("with UUID", func(t *testing.T) {
testTable := "test_table_with_uuid"

createTableSQLStatement := func() string {
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "rudderdb".%[1]q ON CLUSTER "rudder_cluster" (
"id" String, "received_at" DateTime
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{cluster}/tables/%[2]s/{database}/{table}',
'{replica}'
)
ORDER BY
("received_at", "id") PARTITION BY toDate(received_at);`,
testTable,
uuid.New().String(),
)
}

require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err := clusterDB.Exec(createTableSQLStatement())
return err
}))
require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err := clusterDB.Exec(fmt.Sprintf(`DROP TABLE rudderdb.%[1]s ON CLUSTER "rudder_cluster";`, testTable))
return err
}))
require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err := clusterDB.Exec(createTableSQLStatement())
return err
}))
})
t.Run("without UUID", func(t *testing.T) {
testTable := "test_table_without_uuid"

createTableSQLStatement := func() string {
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "rudderdb".%[1]q ON CLUSTER "rudder_cluster" (
"id" String, "received_at" DateTime
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{cluster}/tables/{database}/{table}',
'{replica}'
)
ORDER BY
("received_at", "id") PARTITION BY toDate(received_at);`,
testTable,
)
}

require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err := clusterDB.Exec(createTableSQLStatement())
return err
}))
require.NoError(t, testhelper.WithConstantRetries(func() error {
_, err := clusterDB.Exec(fmt.Sprintf(`DROP TABLE rudderdb.%[1]s ON CLUSTER "rudder_cluster";`, testTable))
return err
}))

err := testhelper.WithConstantRetries(func() error {
_, err := clusterDB.Exec(createTableSQLStatement())
return err
})
require.Error(t, err)

var clickhouseErr *clickhousestd.Exception
require.ErrorAs(t, err, &clickhouseErr)
require.Equal(t, int32(253), clickhouseErr.Code)
})
})
// Alter columns to all the cluster tables
for _, clusterDB := range clusterDBs {
for tableName, columnInfos := range tableColumnInfoMap {
Expand Down

0 comments on commit 41e060a

Please sign in to comment.