diff --git a/warehouse/integrations/clickhouse/clickhouse.go b/warehouse/integrations/clickhouse/clickhouse.go index 604cabc3d5..85e1c1d63b 100644 --- a/warehouse/integrations/clickhouse/clickhouse.go +++ b/warehouse/integrations/clickhouse/clickhouse.go @@ -21,27 +21,22 @@ import ( "strings" "time" - "github.com/rudderlabs/rudder-server/warehouse/integrations/types" - - sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - "github.com/rudderlabs/rudder-server/warehouse/logfield" - - "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" - - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - - "github.com/cenkalti/backoff/v4" - - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/ClickHouse/clickhouse-go" + "github.com/cenkalti/backoff/v4" + "github.com/google/uuid" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/warehouse/client" - + sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" + "github.com/rudderlabs/rudder-server/warehouse/integrations/types" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" + "github.com/rudderlabs/rudder-server/warehouse/logfield" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) @@ -793,7 +788,7 @@ func (ch *Clickhouse) createUsersTable(ctx context.Context, name string, columns if len(strings.TrimSpace(cluster)) > 0 { clusterClause = fmt.Sprintf(`ON CLUSTER %q`, cluster) engine = fmt.Sprintf(`%s%s`, "Replicated", engine) - engineOptions = `'/clickhouse/{cluster}/tables/{database}/{table}', '{replica}'` + engineOptions = fmt.Sprintf(`'/clickhouse/{cluster}/tables/%s/{database}/{table}', '{replica}'`, uuid.New().String()) } sqlStatement := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %q.%q %s ( %v ) ENGINE = %s(%s) ORDER BY %s PARTITION BY toDate(%s)`, ch.Namespace, name, clusterClause, ch.ColumnsWithDataTypes(name, columns, notNullableColumns), engine, engineOptions, getSortKeyTuple(sortKeyFields), partitionField) ch.logger.Infof("CH: Creating table in clickhouse for ch:%s : %v", ch.Warehouse.Destination.ID, sqlStatement) @@ -835,7 +830,7 @@ func (ch *Clickhouse) CreateTable(ctx context.Context, tableName string, columns if len(strings.TrimSpace(cluster)) > 0 { clusterClause = fmt.Sprintf(`ON CLUSTER %q`, cluster) engine = fmt.Sprintf(`%s%s`, "Replicated", engine) - engineOptions = `'/clickhouse/{cluster}/tables/{database}/{table}', '{replica}'` + engineOptions = fmt.Sprintf(`'/clickhouse/{cluster}/tables/%s/{database}/{table}', '{replica}'`, uuid.New().String()) } var orderByClause string if len(sortKeyFields) > 0 { diff --git a/warehouse/integrations/clickhouse/clickhouse_test.go b/warehouse/integrations/clickhouse/clickhouse_test.go index 866594186d..675fe06d92 100644 --- a/warehouse/integrations/clickhouse/clickhouse_test.go +++ b/warehouse/integrations/clickhouse/clickhouse_test.go @@ -12,9 +12,9 @@ import ( "testing" "time" - "github.com/rudderlabs/rudder-go-kit/stats/memstats" - + clickhousestd "github.com/ClickHouse/clickhouse-go" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/rudderlabs/compose-test/compose" @@ -22,7 +22,9 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" "github.com/rudderlabs/rudder-server/testhelper/health" @@ -38,9 +40,9 @@ import ( ) func TestIntegration(t *testing.T) { - if os.Getenv("SLOW") != "1" { - t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.") - } + //if os.Getenv("SLOW") != "1" { + // t.Skip("Skipping tests. Add 'SLOW=1' env var to run test.") + //} c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.clickhouse.yml", "testdata/docker-compose.clickhouse-cluster.yml", "../testdata/docker-compose.jobsdb.yml", "../testdata/docker-compose.minio.yml"})) c.Start(context.Background()) @@ -160,7 +162,7 @@ func TestIntegration(t *testing.T) { destinationID string warehouseEvents testhelper.EventsCountMap warehouseModifiedEvents testhelper.EventsCountMap - clusterSetup func(t testing.TB) + clusterSetup func(t *testing.T) db *sql.DB stagingFilePrefix string }{ @@ -188,9 +190,9 @@ func TestIntegration(t *testing.T) { "aliases": 8, "groups": 8, }, - clusterSetup: func(t testing.TB) { + clusterSetup: func(t *testing.T) { t.Helper() - initializeClickhouseClusterMode(t, dbs[1:], tables) + initializeClickhouseClusterMode(t, dbs[1:], tables, clusterPort1) }, stagingFilePrefix: "testdata/upload-cluster-job", }, @@ -1018,7 +1020,9 @@ func connectClickhouseDB(ctx context.Context, t testing.TB, dsn string) *sql.DB defer cancel() require.Eventually(t, func() bool { - return db.PingContext(ctx) == nil + err := db.PingContext(ctx) + t.Log(err) + return err == nil }, time.Minute, time.Second) err = db.PingContext(ctx) @@ -1027,7 +1031,7 @@ func connectClickhouseDB(ctx context.Context, t testing.TB, dsn string) *sql.DB return db } -func initializeClickhouseClusterMode(t testing.TB, clusterDBs []*sql.DB, tables []string) { +func initializeClickhouseClusterMode(t *testing.T, clusterDBs []*sql.DB, tables []string, clusterPost int) { t.Helper() type ColumnInfoT struct { @@ -1161,6 +1165,79 @@ func initializeClickhouseClusterMode(t testing.TB, clusterDBs []*sql.DB, tables })) } + t.Run("Create Drop Create", func(t *testing.T) { + clusterDB := connectClickhouseDB(context.Background(), t, fmt.Sprintf("tcp://%s:%d?compress=false&database=%s&password=%s&secure=false&skip_verify=true&username=%s", + "localhost", clusterPost, "rudderdb", "rudder-password", "rudder", + )) + + t.Run("with UUID", func(t *testing.T) { + testTable := "test_table_with_uuid" + + createTableSQLStatement := func() string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "rudderdb".%[1]q ON CLUSTER "rudder_cluster" ( + "id" String, "received_at" DateTime + ) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{cluster}/tables/%[2]s/{database}/{table}', + '{replica}' + ) + ORDER BY + ("received_at", "id") PARTITION BY toDate(received_at);`, + testTable, + uuid.New().String(), + ) + } + + require.NoError(t, testhelper.WithConstantRetries(func() error { + _, err := clusterDB.Exec(createTableSQLStatement()) + return err + })) + require.NoError(t, testhelper.WithConstantRetries(func() error { + _, err := clusterDB.Exec(fmt.Sprintf(`DROP TABLE rudderdb.%[1]s ON CLUSTER "rudder_cluster";`, testTable)) + return err + })) + require.NoError(t, testhelper.WithConstantRetries(func() error { + _, err := clusterDB.Exec(createTableSQLStatement()) + return err + })) + }) + t.Run("without UUID", func(t *testing.T) { + testTable := "test_table_without_uuid" + + createTableSQLStatement := func() string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "rudderdb".%[1]q ON CLUSTER "rudder_cluster" ( + "id" String, "received_at" DateTime + ) ENGINE = ReplicatedReplacingMergeTree( + '/clickhouse/{cluster}/tables/{database}/{table}', + '{replica}' + ) + ORDER BY + ("received_at", "id") PARTITION BY toDate(received_at);`, + testTable, + ) + } + + require.NoError(t, testhelper.WithConstantRetries(func() error { + _, err := clusterDB.Exec(createTableSQLStatement()) + return err + })) + require.NoError(t, testhelper.WithConstantRetries(func() error { + _, err := clusterDB.Exec(fmt.Sprintf(`DROP TABLE rudderdb.%[1]s ON CLUSTER "rudder_cluster";`, testTable)) + return err + })) + + err := testhelper.WithConstantRetries(func() error { + _, err := clusterDB.Exec(createTableSQLStatement()) + return err + }) + require.Error(t, err) + + var clickhouseErr *clickhousestd.Exception + require.ErrorAs(t, err, &clickhouseErr) + require.Equal(t, int32(253), clickhouseErr.Code) + }) + }) // Alter columns to all the cluster tables for _, clusterDB := range clusterDBs { for tableName, columnInfos := range tableColumnInfoMap {