Skip to content

Commit

Permalink
feat: introduce random sleep before clickhouse loads
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Dec 5, 2023
1 parent 0f202e8 commit e10def0
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -160,6 +161,7 @@ type Clickhouse struct {
numWorkersDownloadLoadFiles int
s3EngineEnabledWorkspaceIDs []string
slowQueryThreshold time.Duration
randomLoadDelay func(string) time.Duration
}
}

Expand Down Expand Up @@ -234,6 +236,15 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats) *Clickhouse {
ch.config.numWorkersDownloadLoadFiles = conf.GetInt("Warehouse.clickhouse.numWorkersDownloadLoadFiles", 8)
ch.config.s3EngineEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.clickhouse.s3EngineEnabledWorkspaceIDs", nil)
ch.config.slowQueryThreshold = conf.GetDuration("Warehouse.clickhouse.slowQueryThreshold", 5, time.Minute)
ch.config.randomLoadDelay = func(destinationID string) time.Duration {
maxDelay := conf.GetDurationVar(
5,
time.Second,
fmt.Sprintf("Warehouse.clickhouse.%s.maxLoadDelay", destinationID),
"Warehouse.clickhouse.maxLoadDelay",
)
return time.Duration(float64(maxDelay) * (1 - rand.Float64()))
}

return ch
}
Expand Down Expand Up @@ -491,6 +502,9 @@ func (ch *Clickhouse) typecastDataFromType(data, dataType string) interface{} {

// loadTable loads table to clickhouse from the load files
func (ch *Clickhouse) loadTable(ctx context.Context, tableName string, tableSchemaInUpload model.TableSchema) (err error) {
if misc.SleepCtx(ctx, ch.config.randomLoadDelay(ch.Warehouse.Destination.ID)) != nil {
return nil
}

Check warning on line 507 in warehouse/integrations/clickhouse/clickhouse.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/clickhouse/clickhouse.go#L506-L507

Added lines #L506 - L507 were not covered by tests
if ch.UseS3CopyEngineForLoading() {
return ch.loadByCopyCommand(ctx, tableName, tableSchemaInUpload)
}
Expand Down

0 comments on commit e10def0

Please sign in to comment.