Skip to content

Commit

Permalink
feat: introduce random sleep before clickhouse loads (#4193)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Dec 5, 2023
1 parent 09a1ab0 commit 85cfdcf
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -160,6 +161,7 @@ type Clickhouse struct {
numWorkersDownloadLoadFiles int
s3EngineEnabledWorkspaceIDs []string
slowQueryThreshold time.Duration
randomLoadDelay func(string) time.Duration
}
}

Expand Down Expand Up @@ -234,6 +236,16 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats) *Clickhouse {
ch.config.numWorkersDownloadLoadFiles = conf.GetInt("Warehouse.clickhouse.numWorkersDownloadLoadFiles", 8)
ch.config.s3EngineEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.clickhouse.s3EngineEnabledWorkspaceIDs", nil)
ch.config.slowQueryThreshold = conf.GetDuration("Warehouse.clickhouse.slowQueryThreshold", 5, time.Minute)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
ch.config.randomLoadDelay = func(workspaceID string) time.Duration {
maxDelay := conf.GetDurationVar(
0,
time.Second,
fmt.Sprintf("Warehouse.clickhouse.%s.maxLoadDelay", workspaceID),
"Warehouse.clickhouse.maxLoadDelay",
)
return time.Duration(float64(maxDelay) * (1 - r.Float64()))
}

return ch
}
Expand Down Expand Up @@ -491,6 +503,11 @@ func (ch *Clickhouse) typecastDataFromType(data, dataType string) interface{} {

// loadTable loads table to clickhouse from the load files
func (ch *Clickhouse) loadTable(ctx context.Context, tableName string, tableSchemaInUpload model.TableSchema) (err error) {
if delay := ch.config.randomLoadDelay(ch.Warehouse.WorkspaceID); delay > 0 {
if err = misc.SleepCtx(ctx, delay); err != nil {
return
}
}
if ch.UseS3CopyEngineForLoading() {
return ch.loadByCopyCommand(ctx, tableName, tableSchemaInUpload)
}
Expand Down

0 comments on commit 85cfdcf

Please sign in to comment.