From 72b6f0ac8b9e90574d8ca2ee664d5a5c374363f6 Mon Sep 17 00:00:00 2001 From: Vladislav Klimenko Date: Wed, 24 Oct 2018 14:15:10 +0300 Subject: [PATCH] Add ClickHouse database support ClickHouse can be tested along with all other DBses. ClickHouse implementation is based on TimescaleDB. --- .gitignore | 1 + cmd/tsbs_generate_data/common/distribution.go | 20 +- cmd/tsbs_generate_data/common/simulation.go | 2 +- .../devops/cpu_only_generate_data.go | 8 +- .../devops/cpu_only_generate_data_test.go | 8 +- .../devops/generate_data.go | 7 +- .../devops/generate_data_test.go | 6 +- cmd/tsbs_generate_data/main.go | 101 +++-- cmd/tsbs_generate_data/main_test.go | 22 +- .../databases/clickhouse/devops.go | 390 ++++++++++++++++++ .../databases/clickhouse/devops_test.go | 86 ++++ .../databases/timescaledb/devops.go | 210 ++++++++-- cmd/tsbs_generate_queries/main.go | 21 +- .../uses/devops/common.go | 56 +-- .../uses/devops/common_test.go | 10 +- .../uses/devops/groupby_orderby_limit.go | 4 +- .../uses/devops/lastpoint.go | 4 +- .../utils/time_interval.go | 3 +- cmd/tsbs_load_clickhouse/creator.go | 234 +++++++++++ cmd/tsbs_load_clickhouse/creator_test.go | 90 ++++ cmd/tsbs_load_clickhouse/main.go | 115 ++++++ cmd/tsbs_load_clickhouse/main_test.go | 22 + cmd/tsbs_load_clickhouse/process.go | 369 +++++++++++++++++ cmd/tsbs_load_clickhouse/profile.go | 49 +++ cmd/tsbs_load_clickhouse/scan.go | 113 +++++ cmd/tsbs_load_clickhouse/scan_test.go | 117 ++++++ cmd/tsbs_load_timescaledb/main.go | 13 +- cmd/tsbs_run_queries_clickhouse/main.go | 160 +++++++ cmd/tsbs_run_queries_timescaledb/main.go | 2 +- docs/clickhouse.md | 163 ++++++++ load/creator.go | 5 + load/duplex_channel.go | 8 +- load/duplex_channel_test.go | 8 +- load/loader.go | 72 +++- load/loader_test.go | 2 +- load/scan.go | 78 ++-- load/scan_test.go | 8 +- query/benchmarker.go | 103 ++--- query/benchmarker_test.go | 6 +- query/clickhouse.go | 70 ++++ query/scanner.go | 28 +- query/stat_processor.go | 44 +- query/stat_processor_test.go | 20 +- scripts/full_cycle_minitest_clickhouse.sh | 15 + scripts/full_cycle_minitest_timescaledb.sh | 13 + scripts/generate_data.sh | 10 +- scripts/generate_queries.sh | 25 +- scripts/load_clickhouse.sh | 34 ++ scripts/load_common.sh | 2 +- scripts/run_queries_clickhouse.sh | 61 +++ scripts/run_queries_timescaledb.sh | 46 +++ 51 files changed, 2756 insertions(+), 308 deletions(-) create mode 100644 cmd/tsbs_generate_queries/databases/clickhouse/devops.go create mode 100644 cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go create mode 100644 cmd/tsbs_load_clickhouse/creator.go create mode 100644 cmd/tsbs_load_clickhouse/creator_test.go create mode 100644 cmd/tsbs_load_clickhouse/main.go create mode 100644 cmd/tsbs_load_clickhouse/main_test.go create mode 100644 cmd/tsbs_load_clickhouse/process.go create mode 100644 cmd/tsbs_load_clickhouse/profile.go create mode 100644 cmd/tsbs_load_clickhouse/scan.go create mode 100644 cmd/tsbs_load_clickhouse/scan_test.go create mode 100644 cmd/tsbs_run_queries_clickhouse/main.go create mode 100644 docs/clickhouse.md create mode 100644 query/clickhouse.go create mode 100755 scripts/full_cycle_minitest_clickhouse.sh create mode 100755 scripts/full_cycle_minitest_timescaledb.sh create mode 100755 scripts/load_clickhouse.sh create mode 100755 scripts/run_queries_clickhouse.sh create mode 100755 scripts/run_queries_timescaledb.sh diff --git a/.gitignore b/.gitignore index 66a413df8..4332e1da6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.out *.log .DS_Store +.idea diff --git a/cmd/tsbs_generate_data/common/distribution.go b/cmd/tsbs_generate_data/common/distribution.go index a394d0b61..28a0cc203 100644 --- a/cmd/tsbs_generate_data/common/distribution.go +++ b/cmd/tsbs_generate_data/common/distribution.go @@ -21,7 +21,10 @@ type NormalDistribution struct { // ND creates a new normal distribution with the given mean/stddev func ND(mean, stddev float64) *NormalDistribution { - return &NormalDistribution{Mean: mean, StdDev: stddev} + return &NormalDistribution{ + Mean: mean, + StdDev: stddev, + } } // Advance advances this distribution. Since the distribution is @@ -45,7 +48,10 @@ type UniformDistribution struct { // UD creates a new uniform distribution with the given range func UD(low, high float64) *UniformDistribution { - return &UniformDistribution{Low: low, High: high} + return &UniformDistribution{ + Low: low, + High: high, + } } // Advance advances this distribution. Since the distribution is @@ -72,7 +78,10 @@ type RandomWalkDistribution struct { // WD creates a new RandomWalkDistribution based on a given distribution and starting state func WD(step Distribution, state float64) *RandomWalkDistribution { - return &RandomWalkDistribution{Step: step, State: state} + return &RandomWalkDistribution{ + Step: step, + State: state, + } } // Advance computes the next value of this distribution and stores it. @@ -147,7 +156,10 @@ func (d *MonotonicRandomWalkDistribution) Get() float64 { // MWD creates a new MonotonicRandomWalkDistribution with a given distribution and initial state func MWD(step Distribution, state float64) *MonotonicRandomWalkDistribution { - return &MonotonicRandomWalkDistribution{Step: step, State: state} + return &MonotonicRandomWalkDistribution{ + Step: step, + State: state, + } } // ConstantDistribution is a stateful distribution that always returns the same value diff --git a/cmd/tsbs_generate_data/common/simulation.go b/cmd/tsbs_generate_data/common/simulation.go index 452bd3274..bab4a2b2c 100644 --- a/cmd/tsbs_generate_data/common/simulation.go +++ b/cmd/tsbs_generate_data/common/simulation.go @@ -8,7 +8,7 @@ import ( // SimulatorConfig is an interface to create a Simulator from a time.Duration type SimulatorConfig interface { - ToSimulator(time.Duration) Simulator + NewSimulator(time.Duration, uint64) Simulator } // Simulator simulates a use case. diff --git a/cmd/tsbs_generate_data/devops/cpu_only_generate_data.go b/cmd/tsbs_generate_data/devops/cpu_only_generate_data.go index 58efad8cf..3ac7c0bf8 100644 --- a/cmd/tsbs_generate_data/devops/cpu_only_generate_data.go +++ b/cmd/tsbs_generate_data/devops/cpu_only_generate_data.go @@ -20,6 +20,7 @@ func (d *CPUOnlySimulator) Fields() map[string][][]byte { // Next advances a Point to the next state in the generator. func (d *CPUOnlySimulator) Next(p *serialize.Point) bool { + // switch to the next metric if needed if d.hostIndex == uint64(len(d.hosts)) { d.hostIndex = 0 @@ -36,8 +37,8 @@ func (d *CPUOnlySimulator) Next(p *serialize.Point) bool { // CPUOnlySimulatorConfig is used to create a CPUOnlySimulator. type CPUOnlySimulatorConfig commonDevopsSimulatorConfig -// ToSimulator produces a Simulator that conforms to the given SimulatorConfig over the specified interval -func (c *CPUOnlySimulatorConfig) ToSimulator(interval time.Duration) common.Simulator { +// NewSimulator produces a Simulator that conforms to the given SimulatorConfig over the specified interval +func (c *CPUOnlySimulatorConfig) NewSimulator(interval time.Duration, limit uint64) common.Simulator { hostInfos := make([]Host, c.HostCount) for i := 0; i < len(hostInfos); i++ { hostInfos[i] = c.HostConstructor(i, c.Start) @@ -45,6 +46,9 @@ func (c *CPUOnlySimulatorConfig) ToSimulator(interval time.Duration) common.Simu epochs := calculateEpochs(commonDevopsSimulatorConfig(*c), interval) maxPoints := epochs * c.HostCount + if limit > 0 && limit < maxPoints { + maxPoints = limit + } sim := &CPUOnlySimulator{&commonDevopsSimulator{ madePoints: 0, maxPoints: maxPoints, diff --git a/cmd/tsbs_generate_data/devops/cpu_only_generate_data_test.go b/cmd/tsbs_generate_data/devops/cpu_only_generate_data_test.go index e82bac4cb..b98f338ca 100644 --- a/cmd/tsbs_generate_data/devops/cpu_only_generate_data_test.go +++ b/cmd/tsbs_generate_data/devops/cpu_only_generate_data_test.go @@ -19,7 +19,7 @@ var ( ) func TestCPUOnlySimulatorFields(t *testing.T) { - s := testCPUOnlyConf.ToSimulator(time.Second).(*CPUOnlySimulator) + s := testCPUOnlyConf.NewSimulator(time.Second, 0).(*CPUOnlySimulator) fields := s.Fields() if got := len(fields); got != 1 { t.Errorf("fields length does not equal 1: got %d", got) @@ -45,7 +45,7 @@ func TestCPUOnlySimulatorFields(t *testing.T) { } func TestCPUOnlySimulatorNext(t *testing.T) { - s := testCPUOnlyConf.ToSimulator(time.Second).(*CPUOnlySimulator) + s := testCPUOnlyConf.NewSimulator(time.Second, 0).(*CPUOnlySimulator) // There are two epochs for the test configuration, and a difference of 90 // from init to final, so each epoch should add 45 devices to be written. writtenIdx := []int{10, 55, 100} @@ -77,7 +77,7 @@ func TestCPUOnlySimulatorNext(t *testing.T) { runFn(3) } -func TestCPUOnlySimulatorConfigToSimulator(t *testing.T) { +func TestCPUOnlySimulatorConfigNewSimulator(t *testing.T) { duration := time.Second start := time.Now() end := start.Add(10 * time.Second) @@ -90,7 +90,7 @@ func TestCPUOnlySimulatorConfigToSimulator(t *testing.T) { HostCount: numHosts, HostConstructor: NewHostCPUOnly, } - sim := conf.ToSimulator(duration).(*CPUOnlySimulator) + sim := conf.NewSimulator(duration, 0).(*CPUOnlySimulator) if got := sim.madePoints; got != 0 { t.Errorf("incorrect initial points: got %d want %d", got, 0) } diff --git a/cmd/tsbs_generate_data/devops/generate_data.go b/cmd/tsbs_generate_data/devops/generate_data.go index 1c4e5d1d8..145b4cb77 100644 --- a/cmd/tsbs_generate_data/devops/generate_data.go +++ b/cmd/tsbs_generate_data/devops/generate_data.go @@ -38,8 +38,8 @@ func (d *DevopsSimulator) Next(p *serialize.Point) bool { // DevopsSimulatorConfig is used to create a DevopsSimulator. type DevopsSimulatorConfig commonDevopsSimulatorConfig -// ToSimulator produces a Simulator that conforms to the given SimulatorConfig over the specified interval -func (d *DevopsSimulatorConfig) ToSimulator(interval time.Duration) common.Simulator { +// NewSimulator produces a Simulator that conforms to the given SimulatorConfig over the specified interval +func (d *DevopsSimulatorConfig) NewSimulator(interval time.Duration, limit uint64) common.Simulator { hostInfos := make([]Host, d.HostCount) for i := 0; i < len(hostInfos); i++ { hostInfos[i] = d.HostConstructor(i, d.Start) @@ -47,6 +47,9 @@ func (d *DevopsSimulatorConfig) ToSimulator(interval time.Duration) common.Simul epochs := calculateEpochs(commonDevopsSimulatorConfig(*d), interval) maxPoints := epochs * d.HostCount * uint64(len(hostInfos[0].SimulatedMeasurements)) + if limit > 0 && limit < maxPoints { + maxPoints = limit + } dg := &DevopsSimulator{ commonDevopsSimulator: &commonDevopsSimulator{ madePoints: 0, diff --git a/cmd/tsbs_generate_data/devops/generate_data_test.go b/cmd/tsbs_generate_data/devops/generate_data_test.go index fb4be0be4..a30fdcbe0 100644 --- a/cmd/tsbs_generate_data/devops/generate_data_test.go +++ b/cmd/tsbs_generate_data/devops/generate_data_test.go @@ -18,7 +18,7 @@ var testDevopsConf = &DevopsSimulatorConfig{ } func TestDevopsSimulatorNext(t *testing.T) { - s := testDevopsConf.ToSimulator(time.Second).(*DevopsSimulator) + s := testDevopsConf.NewSimulator(time.Second, 0).(*DevopsSimulator) // There are two epochs for the test configuration, and a difference of 90 // from init to final, so each epoch should add 45 devices to be written. writtenIdx := []int{10, 55, 100} @@ -52,7 +52,7 @@ func TestDevopsSimulatorNext(t *testing.T) { runFn(3) } -func TestDevopsSimulatorConfigToSimulator(t *testing.T) { +func TestDevopsSimulatorConfigNewSimulator(t *testing.T) { duration := time.Second start := time.Now() end := start.Add(10 * time.Second) @@ -65,7 +65,7 @@ func TestDevopsSimulatorConfigToSimulator(t *testing.T) { HostCount: numHosts, HostConstructor: NewHost, } - sim := conf.ToSimulator(duration).(*DevopsSimulator) + sim := conf.NewSimulator(duration, 0).(*DevopsSimulator) if got := sim.madePoints; got != 0 { t.Errorf("incorrect initial points: got %d want %d", got, 0) } diff --git a/cmd/tsbs_generate_data/main.go b/cmd/tsbs_generate_data/main.go index dcd72c84a..77bb7b82f 100644 --- a/cmd/tsbs_generate_data/main.go +++ b/cmd/tsbs_generate_data/main.go @@ -2,9 +2,10 @@ // // Supported formats: // Cassandra CSV format +// ClickHouse pseudo-CSV format (the same as for TimescaleDB) // InfluxDB bulk load format // MongoDB BSON format -// TimescaleDB pseudo-CSV format +// TimescaleDB pseudo-CSV format (the same as for ClickHouse) // Supported use cases: // devops: scale-var is the number of hosts to simulate, with log messages @@ -34,6 +35,7 @@ import ( const ( // Output data format choices (alphabetical order) formatCassandra = "cassandra" + formatClickhouse = "clickhouse" formatInflux = "influx" formatMongo = "mongo" formatTimescaleDB = "timescaledb" @@ -52,7 +54,13 @@ const ( // semi-constants var ( - formatChoices = []string{formatCassandra, formatInflux, formatMongo, formatTimescaleDB} + formatChoices = []string{ + formatCassandra, + formatClickhouse, + formatInflux, + formatMongo, + formatTimescaleDB, + } // allows for testing fatal = log.Fatalf ) @@ -64,7 +72,7 @@ type parseableFlagVars struct { timestampStartStr string timestampEndStr string seed int64 - initScaleVar uint64 + initialScale uint64 } // Program option vars: @@ -73,39 +81,46 @@ var ( useCase string profileFile string - initScaleVar uint64 - scaleVar uint64 + initialScale uint64 + scale uint64 seed int64 debug int timestampStart time.Time timestampEnd time.Time - interleavedGenerationGroupID uint - interleavedGenerationGroups uint + interleavedGenerationGroupID uint + interleavedGenerationGroupsNum uint logInterval time.Duration + limit uint64 fileName string ) +// parseTimeFromString parses string-represented time of the format 2006-01-02T15:04:05Z07:00 func parseTimeFromString(s string) time.Time { t, err := time.Parse(time.RFC3339, s) if err != nil { - fatal("%v", err) + fatal("can not parse time from string: %s %v", s, err) return time.Time{} } return t.UTC() } -func validateGroups(groupID, totalGroups uint) (bool, error) { - if totalGroups == 0 { +// validateGroups checks validity of combination groupID and totalGroups +func validateGroups(groupID, totalGroupsNum uint) (bool, error) { + if totalGroupsNum == 0 { + // Need at least one group return false, fmt.Errorf(errTotalGroupsZero) - } else if groupID >= totalGroups { - return false, fmt.Errorf(errInvalidGroupsFmt, groupID, totalGroups) + } + if groupID >= totalGroupsNum { + // Need reasonable groupID + return false, fmt.Errorf(errInvalidGroupsFmt, groupID, totalGroupsNum) } return true, nil } +// validateFormat checks whether provided data format (one of formatChoices) is valid func validateFormat(format string) bool { for _, s := range formatChoices { if s == format { @@ -115,11 +130,12 @@ func validateFormat(format string) bool { return false } +// postFlagParse assigns parseable flags func postFlagParse(flags parseableFlagVars) { - if flags.initScaleVar == 0 { - initScaleVar = scaleVar + if flags.initialScale == 0 { + initialScale = scale } else { - initScaleVar = flags.initScaleVar + initialScale = flags.initialScale } // the default seed is the current timestamp: @@ -154,12 +170,13 @@ func GetBufferedWriter(fileName string) *bufio.Writer { // Parse args: func init() { pfv := parseableFlagVars{} + flag.StringVar(&format, "format", "", fmt.Sprintf("Format to emit. (choices: %s)", strings.Join(formatChoices, ", "))) flag.StringVar(&useCase, "use-case", "", "Use case to model. (choices: devops, cpu-only)") - flag.Uint64Var(&pfv.initScaleVar, "initial-scale-var", 0, "Initial scaling variable specific to the use case (e.g., devices in 'devops'). 0 means to use -scale-var value") - flag.Uint64Var(&scaleVar, "scale-var", 1, "Scaling variable specific to the use case (e.g., devices in 'devops').") + flag.Uint64Var(&pfv.initialScale, "initial-scale-var", 0, "Initial scaling variable specific to the use case (e.g., devices in 'devops'). 0 means to use -scale-var value") + flag.Uint64Var(&scale, "scale", 1, "Scaling value specific to the use case (e.g., devices in 'devops').") flag.StringVar(&pfv.timestampStartStr, "timestamp-start", "2016-01-01T00:00:00Z", "Beginning timestamp (RFC3339).") flag.StringVar(&pfv.timestampEndStr, "timestamp-end", "2016-01-02T06:00:00Z", "Ending timestamp (RFC3339).") @@ -168,11 +185,17 @@ func init() { flag.IntVar(&debug, "debug", 0, "Debug printing (choices: 0, 1, 2). (default 0)") - flag.UintVar(&interleavedGenerationGroupID, "interleaved-generation-group-id", 0, "Group (0-indexed) to perform round-robin serialization within. Use this to scale up data generation to multiple processes.") - flag.UintVar(&interleavedGenerationGroups, "interleaved-generation-groups", 1, "The number of round-robin serialization groups. Use this to scale up data generation to multiple processes.") + flag.UintVar(&interleavedGenerationGroupID, + "interleaved-generation-group-id", 0, + "Group (0-indexed) to perform round-robin serialization within. Use this to scale up data generation to multiple processes.") + flag.UintVar(&interleavedGenerationGroupsNum, + "interleaved-generation-groups", 1, + "The number of round-robin serialization groups. Use this to scale up data generation to multiple processes.") + flag.StringVar(&profileFile, "profile-file", "", "File to which to write go profiling data") flag.DurationVar(&logInterval, "log-interval", 10*time.Second, "Duration between host data points") + flag.Uint64Var(&limit, "limit", 0, "Limit the number of data point to generate, 0 = no limit") flag.StringVar(&fileName, "file", "", "File name to write generated data to") flag.Parse() @@ -181,8 +204,8 @@ func init() { } func main() { - if ok, err := validateGroups(interleavedGenerationGroupID, interleavedGenerationGroups); !ok { - fatal(err.Error()) + if ok, err := validateGroups(interleavedGenerationGroupID, interleavedGenerationGroupsNum); !ok { + fatal("incorrect interleaved groups specification: %v", err) } if ok := validateFormat(format); !ok { fatal("invalid format specifier: %v (valid choices: %v)", format, formatChoices) @@ -204,14 +227,14 @@ func main() { }() cfg := getConfig(useCase) - sim := cfg.ToSimulator(logInterval) + sim := cfg.NewSimulator(logInterval, limit) serializer := getSerializer(sim, format, out) - runSimulator(sim, serializer, out, interleavedGenerationGroupID, interleavedGenerationGroups) + runSimulator(sim, serializer, out, interleavedGenerationGroupID, interleavedGenerationGroupsNum) } func runSimulator(sim common.Simulator, serializer serialize.PointSerializer, out io.Writer, groupID, totalGroups uint) { - currGroup := uint(0) + currGroupID := uint(0) point := serialize.NewPoint() for !sim.Finished() { write := sim.Next(point) @@ -221,16 +244,16 @@ func runSimulator(sim common.Simulator, serializer serialize.PointSerializer, ou } // in the default case this is always true - if currGroup == groupID { + if currGroupID == groupID { err := serializer.Serialize(point, out) if err != nil { - fatal("%v", err) + fatal("can not serialize point to out: %v", err) return } } point.Reset() - currGroup = (currGroup + 1) % totalGroups + currGroupID = (currGroupID + 1) % totalGroups } } @@ -241,8 +264,8 @@ func getConfig(useCase string) common.SimulatorConfig { Start: timestampStart, End: timestampEnd, - InitHostCount: initScaleVar, - HostCount: scaleVar, + InitHostCount: initialScale, + HostCount: scale, HostConstructor: devops.NewHost, } case useCaseCPUOnly: @@ -250,8 +273,8 @@ func getConfig(useCase string) common.SimulatorConfig { Start: timestampStart, End: timestampEnd, - InitHostCount: initScaleVar, - HostCount: scaleVar, + InitHostCount: initialScale, + HostCount: scale, HostConstructor: devops.NewHostCPUOnly, } case useCaseCPUSingle: @@ -259,8 +282,8 @@ func getConfig(useCase string) common.SimulatorConfig { Start: timestampStart, End: timestampEnd, - InitHostCount: initScaleVar, - HostCount: scaleVar, + InitHostCount: initialScale, + HostCount: scale, HostConstructor: devops.NewHostCPUSingle, } default: @@ -273,10 +296,16 @@ func getSerializer(sim common.Simulator, format string, out *bufio.Writer) seria switch format { case formatCassandra: return &serialize.CassandraSerializer{} + case formatInflux: return &serialize.InfluxSerializer{} + case formatMongo: return &serialize.MongoSerializer{} + + case formatClickhouse: + fallthrough + case formatTimescaleDB: out.WriteString("tags") for _, key := range devops.MachineTagKeys { @@ -303,10 +332,10 @@ func getSerializer(sim common.Simulator, format string, out *bufio.Writer) seria out.WriteString("\n") return &serialize.TimescaleDBSerializer{} - default: - fatal("unknown format: '%s'", format) - return nil } + + fatal("unknown format: '%s'", format) + return nil } // startMemoryProfile sets up memory profiling to be written to profileFile. It diff --git a/cmd/tsbs_generate_data/main_test.go b/cmd/tsbs_generate_data/main_test.go index 36c5c5e46..2fcb17ed4 100644 --- a/cmd/tsbs_generate_data/main_test.go +++ b/cmd/tsbs_generate_data/main_test.go @@ -92,18 +92,18 @@ func TestValidateFormat(t *testing.T) { } func TestPostFlagsParse(t *testing.T) { - scaleVar = 100 + scale = 100 timestampStart = time.Time{} timestampEnd = time.Time{} boringPFV := parseableFlagVars{ - initScaleVar: 1, + initialScale: 1, seed: 123, timestampStartStr: correctTimeStr, timestampEndStr: correctTimeStr, } postFlagParse(boringPFV) - if initScaleVar != boringPFV.initScaleVar { - t.Errorf("specified initScaleVar not set correctly: got %d", initScaleVar) + if initialScale != boringPFV.initialScale { + t.Errorf("specified initScaleVar not set correctly: got %d", initialScale) } if seed != boringPFV.seed { t.Errorf("specified seed not set correctly: got %d", seed) @@ -117,19 +117,19 @@ func TestPostFlagsParse(t *testing.T) { // initScaleVar should set to the same as scaleVar testPFV := parseableFlagVars{ - initScaleVar: 0, + initialScale: 0, seed: boringPFV.seed, timestampStartStr: boringPFV.timestampStartStr, timestampEndStr: boringPFV.timestampEndStr, } postFlagParse(testPFV) - if initScaleVar != scaleVar { - t.Errorf("initScaleVar = 0 not parsed correctly: got %d", initScaleVar) + if initialScale != scale { + t.Errorf("initScaleVar = 0 not parsed correctly: got %d", initialScale) } // seed should set to current time testPFV = parseableFlagVars{ - initScaleVar: boringPFV.initScaleVar, + initialScale: boringPFV.initialScale, seed: 0, timestampStartStr: boringPFV.timestampStartStr, timestampEndStr: boringPFV.timestampEndStr, @@ -146,7 +146,7 @@ func TestPostFlagsParse(t *testing.T) { fatalCalled = true } testPFV = parseableFlagVars{ - initScaleVar: boringPFV.initScaleVar, + initialScale: boringPFV.initialScale, seed: boringPFV.seed, timestampStartStr: incorrectTimeStr, timestampEndStr: boringPFV.timestampEndStr, @@ -157,7 +157,7 @@ func TestPostFlagsParse(t *testing.T) { } testPFV = parseableFlagVars{ - initScaleVar: boringPFV.initScaleVar, + initialScale: boringPFV.initialScale, seed: boringPFV.seed, timestampStartStr: boringPFV.timestampStartStr, timestampEndStr: incorrectTimeStr, @@ -351,7 +351,7 @@ func TestGetConfig(t *testing.T) { func TestGetSerializer(t *testing.T) { cfg := getConfig(useCaseCPUOnly) - sim := cfg.ToSimulator(logInterval) + sim := cfg.NewSimulator(logInterval, 0) buf := bytes.NewBuffer(make([]byte, 1024)) out := bufio.NewWriter(buf) defer out.Flush() diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go new file mode 100644 index 000000000..b32f8c373 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go @@ -0,0 +1,390 @@ +package clickhouse + +import ( + "fmt" + "strings" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/query" +) + +// Devops produces ClickHouse-specific queries for all the devops query types. +type Devops struct { + *devops.Core + UseTags bool +} + +// NewDevops makes an Devops object ready to generate Queries. +func NewDevops(start, end time.Time, scale int) *Devops { + return &Devops{ + devops.NewCore(start, end, scale), + false, + } +} + +// GenerateEmptyQuery returns an empty query.ClickHouse +func (d *Devops) GenerateEmptyQuery() query.Query { + return query.NewClickHouse() +} + +// getHostWhereWithHostnames creates WHERE SQL statement for multiple hostnames. +// NB 'WHERE' itself is not included, just hostname filter clauses, ready to concatenate to 'WHERE' string +func (d *Devops) getHostWhereWithHostnames(hostnames []string) string { + hostnameSelectionClauses := []string{} + + if d.UseTags { + // Use separated table for Tags + // Need to prepare WHERE with `tags` table + // WHERE tags_id IN (SELECT those tag.id FROM separated tags table WHERE ) + for _, s := range hostnames { + hostnameSelectionClauses = append(hostnameSelectionClauses, fmt.Sprintf("'%s'", s)) + } + return fmt.Sprintf("tags_id IN (SELECT id FROM tags WHERE hostname IN (%s))", strings.Join(hostnameSelectionClauses, ",")) + } + + // Here we DO NOT use tags as a separate table + // So hostname is embedded into processed table itself and we can build direct WHERE statement as + // ((hostname = 'H1') OR (hostname = 'H2') ...) + + // All tags are included into one table + // Need to prepare WHERE (hostname = 'host1' OR hostname = 'host2') clause + for _, s := range hostnames { + hostnameSelectionClauses = append(hostnameSelectionClauses, fmt.Sprintf("hostname = '%s'", s)) + } + // (host=h1 OR host=h2) + return "(" + strings.Join(hostnameSelectionClauses, " OR ") + ")" +} + +// getHostWhereString gets multiple random hostnames and create WHERE SQL statement for these hostnames. +func (d *Devops) getHostWhereString(nhosts int) string { + hostnames := d.GetRandomHosts(nhosts) + return d.getHostWhereWithHostnames(hostnames) +} + +// getSelectClausesAggMetrics gets specified aggregate function clause for multiple memtrics +// Ex.: max(cpu_time) AS max_cpu_time +func (d *Devops) getSelectClausesAggMetrics(aggregateFunction string, metrics []string) []string { + selectAggregateClauses := make([]string, len(metrics)) + for i, metric := range metrics { + selectAggregateClauses[i] = fmt.Sprintf("%[1]s(%[2]s) AS %[1]s_%[2]s", aggregateFunction, metric) + } + return selectAggregateClauses +} + +// ClickHouse understands and can compare time presented as strings of this format +const clickhouseTimeStringFormat = "2006-01-02 15:04:05" + +// GroupByTime selects the MAX for numMetrics metrics under 'cpu', +// per minute for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT minute, max(metric1), ..., max(metricN) +// FROM cpu +// WHERE +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// GROUP BY minute +// ORDER BY minute ASC +func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.RandWindow(timeRange) + metrics := devops.GetCPUMetricsSlice(numMetrics) + selectClauses := d.getSelectClausesAggMetrics("max", metrics) + + sql := fmt.Sprintf(` + SELECT + toStartOfMinute(created_at) AS minute, + %s + FROM + cpu + WHERE + %s + AND created_at >= '%s' + AND created_at < '%s' + GROUP BY + minute + ORDER BY + minute ASC + `, + strings.Join(selectClauses, ", "), + d.getHostWhereString(nHosts), + interval.Start.Format(clickhouseTimeStringFormat), + interval.End.Format(clickhouseTimeStringFormat)) + + humanLabel := fmt.Sprintf("ClickHouse %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit: +// SELECT time_bucket('1 minute', time) AS t, MAX(cpu) +// FROM cpu +// WHERE time < '$TIME' +// GROUP BY t +// ORDER BY t DESC +// LIMIT $LIMIT +func (d *Devops) GroupByOrderByLimit(qi query.Query) { + interval := d.Interval.RandWindow(time.Hour) + sql := fmt.Sprintf(` + SELECT + toStartOfMinute(created_at) AS minute, + max(usage_user) + FROM + cpu + WHERE + created_at < '%s' + GROUP BY + minute + ORDER BY + minute DESC + LIMIT + 5 + `, + interval.End.Format(clickhouseTimeStringFormat)) + + humanLabel := "ClickHouse max cpu over last 5 min-intervals (random end)" + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day, +// e.g. in pseudo-SQL: +// +// SELECT AVG(metric1), ..., AVG(metricN) +// FROM cpu +// WHERE time >= '$HOUR_START' AND time < '$HOUR_END' +// GROUP BY hour, hostname +// ORDER BY hour +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + metrics := devops.GetCPUMetricsSlice(numMetrics) + interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) + + selectClauses := make([]string, numMetrics) + meanClauses := make([]string, numMetrics) + for i, m := range metrics { + meanClauses[i] = "mean_" + m + selectClauses[i] = fmt.Sprintf("avg(%s) AS %s", m, meanClauses[i]) + } + + hostnameField := "hostname" + joinClause := "" + if d.UseTags { + //hostnameField = "tags.hostname" + hostnameField = "hostname" + joinClause = "ANY INNER JOIN tags USING id" + } + + // SELECT + // hour, + // hostname, + // mean_usage_user + // FROM ( + // SELECT + // toStartOfHour(created_at) AS hour, + // tags_id AS id, + // avg(usage_user) AS mean_usage_user + // FROM + // cpu + // WHERE + // created_at >= '2016-01-02 11:22:40' + // AND created_at < '2016-01-02 23:22:40' + // GROUP BY + // hour, + // id + // ) AS cpu_avg + // ANY INNER JOIN tags USING id + // ORDER BY + // hour, + // hostname + + sql := fmt.Sprintf(` + SELECT + hour, + %s, + %s + FROM ( + SELECT + toStartOfHour(created_at) AS hour, + tags_id AS id, + %s + FROM + cpu + WHERE + created_at >= '%s' + AND created_at < '%s' + GROUP BY + hour, + id + ) AS cpu_avg + %s + ORDER BY + hour, + %s + `, + hostnameField, // main SELECT %s, + strings.Join(meanClauses, ", "), // main SELECT %s + strings.Join(selectClauses, ", "), // cpu_avg SELECT %s + interval.Start.Format(clickhouseTimeStringFormat), // cpu_avg time >= '%s' + interval.End.Format(clickhouseTimeStringFormat), // cpu_avg time < '%s' + joinClause, // JOIN clause + hostnameField) // ORDER BY %s + + humanLabel := devops.GetDoubleGroupByLabel("ClickHouse", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT MAX(metric1), ..., MAX(metricN) +// FROM cpu +// WHERE +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// GROUP BY hour +// ORDER BY hour +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { + interval := d.Interval.RandWindow(devops.MaxAllDuration) + metrics := devops.GetAllCPUMetrics() + selectClauses := d.getSelectClausesAggMetrics("max", metrics) + + sql := fmt.Sprintf(` + SELECT + toStartOfHour(created_at) AS hour, + %s + FROM + cpu + WHERE + %s + AND created_at >= '%s' + AND created_at < '%s' + GROUP BY + hour + ORDER BY + hour + `, + strings.Join(selectClauses, ", "), + d.getHostWhereString(nHosts), + interval.Start.Format(clickhouseTimeStringFormat), + interval.End.Format(clickhouseTimeStringFormat)) + + humanLabel := devops.GetMaxAllLabel("ClickHouse", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// LastPointPerHost finds the last row for every host in the dataset +func (d *Devops) LastPointPerHost(qi query.Query) { + var sql string + if d.UseTags { + sql = fmt.Sprintf(` + SELECT + * + FROM + ( + SELECT + * + FROM + cpu + WHERE + (tags_id, created_at) IN ( + SELECT + tags_id, + max(created_at) + FROM + cpu + GROUP BY + tags_id + ) + ) c + ANY INNER JOIN + tags t ON c.tags_id = t.id + ORDER BY + t.hostname ASC, + c.time DESC + `) + // SQL-like syntax would be + // SELECT + // DISTINCT(t.hostname), * + // FROM + // tags AS t + // INNER JOIN ( + // SELECT + // * + // FROM + // cpu AS c + // WHERE + // c.tags_id = t.id + // ORDER BY + // created_at DESC + // LIMIT + // 1 + // ) AS b ON true + // ORDER BY + // t.hostname, + // b.time DESC + } else { + sql = fmt.Sprintf(` + SELECT + DISTINCT(hostname), * + FROM + cpu + ORDER BY + hostname ASC, + created_at DESC + `) + } + + humanLabel := "ClickHouse last row per host" + humanDesc := humanLabel + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high +// usage between a time period for a number of hosts (if 0, it will search all hosts), +// e.g. in pseudo-SQL: +// +// SELECT * FROM cpu +// WHERE usage_user > 90.0 +// AND time >= '$TIME_START' AND time < '$TIME_END' +// AND (hostname = '$HOST' OR hostname = '$HOST2'...) +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + var hostWhereClause string + if nHosts == 0 { + hostWhereClause = "" + } else { + hostWhereClause = fmt.Sprintf("AND %s", d.getHostWhereString(nHosts)) + } + interval := d.Interval.RandWindow(devops.HighCPUDuration) + + sql := fmt.Sprintf(` + SELECT + * + FROM + cpu + WHERE + usage_user > 90.0 + AND created_at >= '%s' + AND created_at < '%s' + %s + `, + interval.Start.Format(clickhouseTimeStringFormat), + interval.End.Format(clickhouseTimeStringFormat), + hostWhereClause) + + humanLabel := devops.GetHighCPULabel("ClickHouse", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// fill Query fills the query struct with data +func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, sql string) { + q := qi.(*query.ClickHouse) + q.HumanLabel = []byte(humanLabel) + q.HumanDescription = []byte(humanDesc) + q.Table = []byte("cpu") + q.SqlQuery = []byte(sql) +} diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go b/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go new file mode 100644 index 000000000..ff110dd00 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/clickhouse/devops_test.go @@ -0,0 +1,86 @@ +package clickhouse + +import ( + "strings" + "testing" + "time" +) + +func TestDevopsGetHostWhereWithHostnames(t *testing.T) { + cases := []struct { + desc string + hostnames []string + useTags bool + want string + }{ + { + desc: "single host - no json or tags", + hostnames: []string{"foo1"}, + useTags: false, + want: "(hostname = 'foo1')", + }, + { + desc: "multi host - no json or tags", + hostnames: []string{"foo1", "foo2"}, + useTags: false, + want: "(hostname = 'foo1' OR hostname = 'foo2')", + }, + { + desc: "single host - w/ tags", + hostnames: []string{"foo1"}, + useTags: true, + want: "tags_id IN (SELECT id FROM tags WHERE hostname IN ('foo1'))", + }, + { + desc: "multi host - w/ tags", + hostnames: []string{"foo1", "foo2"}, + useTags: true, + want: "tags_id IN (SELECT id FROM tags WHERE hostname IN ('foo1','foo2'))", + }, + } + + for _, c := range cases { + d := NewDevops(time.Now(), time.Now(), 10) + d.UseTags = c.useTags + + if got := d.getHostWhereWithHostnames(c.hostnames); got != c.want { + t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) + } + } +} + +func TestDevopsGetSelectClausesAggMetrics(t *testing.T) { + cases := []struct { + desc string + agg string + metrics []string + want string + }{ + { + desc: "single metric - max", + agg: "max", + metrics: []string{"foo"}, + want: "max(foo) AS max_foo", + }, + { + desc: "multiple metric - max", + agg: "max", + metrics: []string{"foo", "bar"}, + want: "max(foo) AS max_foo,max(bar) AS max_bar", + }, + { + desc: "multiple metric - avg", + agg: "avg", + metrics: []string{"foo", "bar"}, + want: "avg(foo) AS avg_foo,avg(bar) AS avg_bar", + }, + } + + for _, c := range cases { + d := NewDevops(time.Now(), time.Now(), 10) + + if got := strings.Join(d.getSelectClausesAggMetrics(c.agg, c.metrics), ","); got != c.want { + t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) + } + } +} diff --git a/cmd/tsbs_generate_queries/databases/timescaledb/devops.go b/cmd/tsbs_generate_queries/databases/timescaledb/devops.go index f443e79be..5dd696021 100644 --- a/cmd/tsbs_generate_queries/databases/timescaledb/devops.go +++ b/cmd/tsbs_generate_queries/databases/timescaledb/devops.go @@ -26,6 +26,8 @@ func (d *Devops) GenerateEmptyQuery() query.Query { return query.NewTimescaleDB() } +// getHostWhereWithHostnames creates WHERE SQL statement for multiple hostnames. +// NB 'WHERE' itself is not included, just hostname filter clauses, ready to concatenate to 'WHERE' string func (d *Devops) getHostWhereWithHostnames(hostnames []string) string { hostnameClauses := []string{} if d.UseJSON { @@ -48,6 +50,7 @@ func (d *Devops) getHostWhereWithHostnames(hostnames []string) string { } } +// getHostWhereString gets multiple random hostnames and create WHERE SQL statement for these hostnames. func (d *Devops) getHostWhereString(nhosts int) string { hostnames := d.GetRandomHosts(nhosts) return d.getHostWhereWithHostnames(hostnames) @@ -70,19 +73,32 @@ const goTimeFmt = "2006-01-02 15:04:05.999999 -0700" // // SELECT minute, max(metric1), ..., max(metricN) // FROM cpu -// WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') -// AND time >= '$HOUR_START' AND time < '$HOUR_END' -// GROUP BY minute ORDER BY minute ASC +// WHERE +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// GROUP BY minute +// ORDER BY minute ASC func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { interval := d.Interval.RandWindow(timeRange) metrics := devops.GetCPUMetricsSlice(numMetrics) selectClauses := d.getSelectClausesAggMetrics("max", metrics) - sql := fmt.Sprintf(`SELECT time_bucket('1 minute', time) AS minute, - %s - FROM cpu - WHERE %s AND time >= '%s' AND time < '%s' - GROUP BY minute ORDER BY minute ASC`, + sql := fmt.Sprintf(` + SELECT + time_bucket('1 minute', time) AS minute, + %s + FROM + cpu + WHERE + %s + AND time >= '%s' + AND time < '%s' + GROUP BY + minute + ORDER BY + minute ASC + `, strings.Join(selectClauses, ", "), d.getHostWhereString(nHosts), interval.Start.Format(goTimeFmt), @@ -94,16 +110,29 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t } // GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit: -// SELECT time_bucket('1 minute', time) AS t, MAX(cpu) FROM cpu +// SELECT time_bucket('1 minute', time) AS t, MAX(cpu) +// FROM cpu // WHERE time < '$TIME' -// GROUP BY t ORDER BY t DESC +// GROUP BY t +// ORDER BY t DESC // LIMIT $LIMIT func (d *Devops) GroupByOrderByLimit(qi query.Query) { interval := d.Interval.RandWindow(time.Hour) - timeStr := interval.End.Format(goTimeFmt) - - where := fmt.Sprintf("WHERE time < '%s'", timeStr) - sql := fmt.Sprintf(`SELECT time_bucket('1 minute', time) AS minute, max(usage_user) FROM cpu %s GROUP BY minute ORDER BY minute DESC LIMIT 5`, where) + sql := fmt.Sprintf(` + SELECT + time_bucket('1 minute', time) AS minute, + max(usage_user) + FROM + cpu + WHERE + time < '%s' + GROUP BY + minute + ORDER BY + minute DESC + LIMIT 5 + `, + interval.End.Format(goTimeFmt)) humanLabel := "TimescaleDB max cpu over last 5 min-intervals (random end)" humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) @@ -116,7 +145,8 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { // SELECT AVG(metric1), ..., AVG(metricN) // FROM cpu // WHERE time >= '$HOUR_START' AND time < '$HOUR_END' -// GROUP BY hour, hostname ORDER BY hour +// GROUP BY hour, hostname +// ORDER BY hour func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { metrics := devops.GetCPUMetricsSlice(numMetrics) interval := d.Interval.RandWindow(devops.DoubleGroupByDuration) @@ -141,45 +171,78 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { sql := fmt.Sprintf(` WITH cpu_avg AS ( - SELECT time_bucket('1 hour', time) as hour, tags_id, - %s - FROM cpu - WHERE time >= '%s' AND time < '%s' - GROUP BY hour, tags_id + SELECT + time_bucket('1 hour', time) as hour, + tags_id, + %s + FROM + cpu + WHERE + time >= '%s' + AND time < '%s' + GROUP BY + hour, + tags_id ) - SELECT hour, %s, %s - FROM cpu_avg - %s - ORDER BY hour, %s`, + SELECT + hour, + %s, + %s + FROM + cpu_avg + %s + ORDER BY + hour, + %s + `, strings.Join(selectClauses, ", "), - interval.Start.Format(goTimeFmt), interval.End.Format(goTimeFmt), - hostnameField, strings.Join(meanClauses, ", "), - joinStr, hostnameField) + interval.Start.Format(goTimeFmt), + interval.End.Format(goTimeFmt), + hostnameField, + strings.Join(meanClauses, ", "), + joinStr, // JOIN + hostnameField) + humanLabel := devops.GetDoubleGroupByLabel("TimescaleDB", numMetrics) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) d.fillInQuery(qi, humanLabel, humanDesc, sql) } // MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts, -// e.g. in psuedo-SQL: +// e.g. in pseudo-SQL: // // SELECT MAX(metric1), ..., MAX(metricN) -// FROM cpu WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') -// AND time >= '$HOUR_START' AND time < '$HOUR_END' -// GROUP BY hour ORDER BY hour +// FROM cpu +// WHERE +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// GROUP BY hour +// ORDER BY hour func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { interval := d.Interval.RandWindow(devops.MaxAllDuration) metrics := devops.GetAllCPUMetrics() selectClauses := d.getSelectClausesAggMetrics("max", metrics) - sql := fmt.Sprintf(`SELECT time_bucket('1 hour', time) AS hour, - %s - FROM cpu - WHERE %s AND time >= '%s' AND time < '%s' - GROUP BY hour ORDER BY hour`, + sql := fmt.Sprintf(` + SELECT + time_bucket('1 hour', time) AS hour, + %s + FROM + cpu + WHERE + %s + AND time >= '%s' + AND time < '%s' + GROUP BY + hour + ORDER BY + hour + `, strings.Join(selectClauses, ", "), d.getHostWhereString(nHosts), - interval.Start.Format(goTimeFmt), interval.End.Format(goTimeFmt)) + interval.Start.Format(goTimeFmt), + interval.End.Format(goTimeFmt)) humanLabel := devops.GetMaxAllLabel("TimescaleDB", nHosts) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) @@ -190,11 +253,61 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { func (d *Devops) LastPointPerHost(qi query.Query) { var sql string if d.UseTags { - sql = fmt.Sprintf("SELECT DISTINCT ON (t.hostname) * FROM tags t INNER JOIN LATERAL(SELECT * FROM cpu c WHERE c.tags_id = t.id ORDER BY time DESC LIMIT 1) AS b ON true ORDER BY t.hostname, b.time DESC") + sql = fmt.Sprintf(` + SELECT + DISTINCT ON (t.hostname) * + FROM + tags t + INNER JOIN LATERAL ( + SELECT + * + FROM + cpu c + WHERE + c.tags_id = t.id + ORDER BY + time DESC + LIMIT + 1 + ) AS b ON true + ORDER BY + t.hostname, + b.time DESC + `) + } else if d.UseJSON { - sql = fmt.Sprintf("SELECT DISTINCT ON (t.tagset->>'hostname') * FROM tags t INNER JOIN LATERAL(SELECT * FROM cpu c WHERE c.tags_id = t.id ORDER BY time DESC LIMIT 1) AS b ON true ORDER BY t.tagset->>'hostname', b.time DESC") + sql = fmt.Sprintf(` + SELECT + DISTINCT ON (t.tagset->>'hostname') * + FROM + tags t + INNER JOIN LATERAL ( + SELECT + * + FROM + cpu c + WHERE + c.tags_id = t.id + ORDER BY + time DESC + LIMIT + 1 + ) AS b ON true + ORDER BY + t.tagset->>'hostname', + b.time DESC + `) + } else { - sql = fmt.Sprintf(`SELECT DISTINCT ON (hostname) * FROM cpu ORDER BY hostname, time DESC`) + sql = fmt.Sprintf(` + SELECT + DISTINCT ON (hostname) * + FROM + cpu + ORDER BY + hostname, + time DESC + `) } humanLabel := "TimescaleDB last row per host" @@ -219,14 +332,27 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { } interval := d.Interval.RandWindow(devops.HighCPUDuration) - sql := fmt.Sprintf(`SELECT * FROM cpu WHERE usage_user > 90.0 and time >= '%s' AND time < '%s' %s`, - interval.Start.Format(goTimeFmt), interval.End.Format(goTimeFmt), hostWhereClause) + sql := fmt.Sprintf(` + SELECT + * + FROM + cpu + WHERE + usage_user > 90.0 + AND time >= '%s' + AND time < '%s' + %s + `, + interval.Start.Format(goTimeFmt), + interval.End.Format(goTimeFmt), + hostWhereClause) humanLabel := devops.GetHighCPULabel("TimescaleDB", nHosts) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) d.fillInQuery(qi, humanLabel, humanDesc, sql) } +// fill Query fills teh query struct with data func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, sql string) { q := qi.(*query.TimescaleDB) q.HumanLabel = []byte(humanLabel) diff --git a/cmd/tsbs_generate_queries/main.go b/cmd/tsbs_generate_queries/main.go index 3b3b2f683..737ed4ada 100644 --- a/cmd/tsbs_generate_queries/main.go +++ b/cmd/tsbs_generate_queries/main.go @@ -19,6 +19,7 @@ import ( "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/timescaledb" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/clickhouse" ) var useCaseMatrix = map[string]map[string]utils.QueryFillerMaker{ @@ -59,6 +60,8 @@ var ( timescaleUseJSON bool timescaleUseTags bool + clickhouseUseTags bool + interleavedGenerationGroupID uint interleavedGenerationGroups uint ) @@ -66,6 +69,10 @@ var ( func getGenerator(format string, start, end time.Time, scale int) utils.DevopsGenerator { if format == "cassandra" { return cassandra.NewDevops(start, end, scale) + } else if format == "clickhouse" { + tgen := clickhouse.NewDevops(start, end, scale) + tgen.UseTags = clickhouseUseTags + return tgen } else if format == "influx" { return influx.NewDevops(start, end, scale) } else if format == "mongo" { @@ -115,19 +122,25 @@ func init() { } } - var useCase, queryType, format, timestampStartStr, timestampEndStr string - var scaleVar int + var format string + var useCase string + var queryType string + var scale int + var timestampStartStr string + var timestampEndStr string flag.StringVar(&format, "format", "", "Format to emit. (Choices are in the use case matrix.)") flag.StringVar(&useCase, "use-case", "", "Use case to model. (Choices are in the use case matrix.)") flag.StringVar(&queryType, "query-type", "", "Query type. (Choices are in the use case matrix.)") - flag.IntVar(&scaleVar, "scale-var", 1, "Scaling variable (must be the equal to the scalevar used for data generation).") + flag.IntVar(&scale, "scale", 1, "Scaling variable (must be the equal to the scalevar used for data generation).") flag.IntVar(&queryCount, "queries", 1000, "Number of queries to generate.") flag.BoolVar(×caleUseJSON, "timescale-use-json", false, "TimescaleDB only: Use separate JSON tags table when querying") flag.BoolVar(×caleUseTags, "timescale-use-tags", true, "TimescaleDB only: Use separate tags table when querying") + flag.BoolVar(&clickhouseUseTags, "clickhouse-use-tags", true, "ClickHouse only: Use separate tags table when querying") + flag.StringVar(×tampStartStr, "timestamp-start", "2016-01-01T00:00:00Z", "Beginning timestamp (RFC3339).") flag.StringVar(×tampEndStr, "timestamp-end", "2016-01-02T06:00:00Z", "Ending timestamp (RFC3339).") @@ -173,7 +186,7 @@ func init() { timestampEnd = timestampEnd.UTC() // Make the query generator: - generator = getGenerator(format, timestampStart, timestampEnd, scaleVar) + generator = getGenerator(format, timestampStart, timestampEnd, scale) filler = useCaseMatrix[useCase][queryType](generator) } diff --git a/cmd/tsbs_generate_queries/uses/devops/common.go b/cmd/tsbs_generate_queries/uses/devops/common.go index 2b0132a16..16bf52ebc 100644 --- a/cmd/tsbs_generate_queries/uses/devops/common.go +++ b/cmd/tsbs_generate_queries/uses/devops/common.go @@ -22,22 +22,22 @@ const ( // DoubleGroupByDuration is the how big the time range for DoubleGroupBy query is DoubleGroupByDuration = 12 * time.Hour // HighCPUDuration is the how big the time range for HighCPU query is - HighCPUDuration = 12 * time.Hour + HighCPUDuration = 12 * time.Hour // MaxAllDuration is the how big the time range for MaxAll query is - MaxAllDuration = 8 * time.Hour + MaxAllDuration = 8 * time.Hour // LabelSingleGroupby is the label prefix for queries of the single groupby variety - LabelSingleGroupby = "single-groupby" + LabelSingleGroupby = "single-groupby" // LabelDoubleGroupby is the label prefix for queries of the double groupby variety - LabelDoubleGroupby = "double-groupby" + LabelDoubleGroupby = "double-groupby" // LabelLastpoint is the label for the lastpoint query - LabelLastpoint = "lastpoint" + LabelLastpoint = "lastpoint" // LabelMaxAll is the label prefix for queries of the max all variety - LabelMaxAll = "cpu-max-all" + LabelMaxAll = "cpu-max-all" // LabelGroupbyOrderbyLimit is the label for groupby-orderby-limit query LabelGroupbyOrderbyLimit = "groupby-orderby-limit" // LabelHighCPU is the prefix for queries of the high-CPU variety - LabelHighCPU = "high-cpu" + LabelHighCPU = "high-cpu" ) // for ease of testing @@ -47,6 +47,7 @@ var fatal = log.Fatalf type Core struct { // Interval is the entire time range of the dataset Interval utils.TimeInterval + // Scale is the cardinality of the dataset in terms of devices/hosts Scale int } @@ -58,12 +59,15 @@ func NewCore(start, end time.Time, scale int) *Core { return nil } - return &Core{utils.NewTimeInterval(start, end), scale} + return &Core{ + utils.NewTimeInterval(start, end), + scale, + } } // GetRandomHosts returns a random set of nHosts from a given Core func (d *Core) GetRandomHosts(nHosts int) []string { - return getRandomHosts(d.Scale, nHosts) + return getRandomHosts(nHosts, d.Scale) } // cpuMetrics is the list of metric names for CPU @@ -157,41 +161,45 @@ func GetMaxAllLabel(dbName string, nHosts int) string { return fmt.Sprintf("%s max of all CPU metrics, random %4d hosts, random %s by 1h", dbName, nHosts, MaxAllDuration) } -func getRandomHosts(scale, nHosts int) []string { - if nHosts < 1 { - fatal("number of hosts cannot be < 1; got %d", nHosts) +// getRandomHosts returns a subset of numHosts hostnames of a permutation of hostnames, +// numbered from 0 to totalHostsScale. +// Ex.: host_12, host_7, host_22 for numHosts=3 and totlaHostsScale=30 +func getRandomHosts(numHosts int, totalHostsScale int) []string { + if numHosts < 1 { + fatal("number of hosts cannot be < 1; got %d", numHosts) return nil } - if nHosts > scale { - fatal("number of hosts (%d) larger than --scale-var (%d)", nHosts, scale) + if numHosts > totalHostsScale { + fatal("number of hosts (%d) larger than total hosts. See --scale (%d)", numHosts, totalHostsScale) return nil } - nn := getRandomSubsetPerm(scale, nHosts) + randomNumbers := getRandomSubsetPermutation(numHosts, totalHostsScale) hostnames := []string{} - for _, n := range nn { + for _, n := range randomNumbers { hostnames = append(hostnames, fmt.Sprintf("host_%d", n)) } return hostnames } -// getRandomSubsetPerm returns a subset of nItems of a permutation of numbers -// from 0 to scale. This is an alternative to rand.Perm and then taking a -// sub-slice, which used up a lot more memory and slowed down query generation -// significantly. The subset of the permutation should have no duplicates. -func getRandomSubsetPerm(scale, nItems int) []int { - if nItems > scale { +// getRandomSubsetPermutation returns a subset of numItems of a permutation of numbers from 0 to totalNumbersScale. +// This is an alternative to rand.Perm and then taking a/ sub-slice, which used up a lot more memory +// and slowed down query generation significantly. +// The subset of the permutation should have no duplicates. +func getRandomSubsetPermutation(numItems int, totalItemsScale int) []int { + if numItems > totalItemsScale { + // Do not have so many items - can't do it fatal(errMoreItemsThanScale) return nil } seen := map[int]bool{} res := []int{} - for i := 0; i < nItems; i++ { + for i := 0; i < numItems; i++ { for { - n := rand.Intn(scale) + n := rand.Intn(totalItemsScale) // Keep iterating until a previously unseen int is found if !seen[n] { seen[n] = true diff --git a/cmd/tsbs_generate_queries/uses/devops/common_test.go b/cmd/tsbs_generate_queries/uses/devops/common_test.go index cbdc11bea..dccfb4109 100644 --- a/cmd/tsbs_generate_queries/uses/devops/common_test.go +++ b/cmd/tsbs_generate_queries/uses/devops/common_test.go @@ -140,7 +140,7 @@ func TestGetRandomHosts(t *testing.T) { scale: 1, nHosts: 5, shouldFatal: true, - wantFatal: "number of hosts (5) larger than --scale-var (1)", + wantFatal: "number of hosts (5) larger than total hosts. See --scale (1)", }, } @@ -151,7 +151,7 @@ func TestGetRandomHosts(t *testing.T) { fatal = func(format string, args ...interface{}) { errMsg = fmt.Sprintf(format, args...) } - hosts := getRandomHosts(c.scale, c.nHosts) + hosts := getRandomHosts(c.nHosts, c.scale) if hosts != nil { t.Errorf("%s: fatal'd but with non-nil return: %v", c.desc, hosts) } @@ -159,7 +159,7 @@ func TestGetRandomHosts(t *testing.T) { t.Errorf("%s: incorrect fatal msg:\ngot\n%s\nwant\n%s", c.desc, errMsg, c.wantFatal) } } else { - hosts := getRandomHosts(c.scale, c.nHosts) + hosts := getRandomHosts(c.nHosts, c.scale) if got := strings.Join(hosts, ","); got != c.want { t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) } @@ -237,7 +237,7 @@ func TestGetRandomSubsetPerm(t *testing.T) { } for _, c := range cases { - ret := getRandomSubsetPerm(c.scale, c.nItems) + ret := getRandomSubsetPermutation(c.nItems, c.scale) if len(ret) != c.nItems { t.Errorf("return list not long enough: got %d want %d (scale %d)", len(ret), c.nItems, c.scale) } @@ -257,7 +257,7 @@ func TestGetRandomSubsetPermError(t *testing.T) { fatal = func(format string, args ...interface{}) { errMsg = fmt.Sprintf(format, args...) } - ret := getRandomSubsetPerm(10, 11) + ret := getRandomSubsetPermutation(11, 10) if ret != nil { t.Errorf("return was non-nil: %v", ret) } diff --git a/cmd/tsbs_generate_queries/uses/devops/groupby_orderby_limit.go b/cmd/tsbs_generate_queries/uses/devops/groupby_orderby_limit.go index e2603f0ee..710000f64 100644 --- a/cmd/tsbs_generate_queries/uses/devops/groupby_orderby_limit.go +++ b/cmd/tsbs_generate_queries/uses/devops/groupby_orderby_limit.go @@ -12,7 +12,9 @@ type GroupByOrderByLimit struct { // NewGroupByOrderByLimit returns a new GroupByOrderByLimit for given paremeters func NewGroupByOrderByLimit(core utils.DevopsGenerator) utils.QueryFiller { - return &GroupByOrderByLimit{core} + return &GroupByOrderByLimit{ + core, + } } // Fill fills in the query.Query with query details diff --git a/cmd/tsbs_generate_queries/uses/devops/lastpoint.go b/cmd/tsbs_generate_queries/uses/devops/lastpoint.go index 37d13f22e..c22368f24 100644 --- a/cmd/tsbs_generate_queries/uses/devops/lastpoint.go +++ b/cmd/tsbs_generate_queries/uses/devops/lastpoint.go @@ -12,7 +12,9 @@ type LastPointPerHost struct { // NewLastPointPerHost returns a new LastPointPerHost for given paremeters func NewLastPointPerHost(core utils.DevopsGenerator) utils.QueryFiller { - return &LastPointPerHost{core} + return &LastPointPerHost{ + core, + } } // Fill fills in the query.Query with query details diff --git a/cmd/tsbs_generate_queries/utils/time_interval.go b/cmd/tsbs_generate_queries/utils/time_interval.go index ef7e948cb..9687a29f0 100644 --- a/cmd/tsbs_generate_queries/utils/time_interval.go +++ b/cmd/tsbs_generate_queries/utils/time_interval.go @@ -7,7 +7,8 @@ import ( // TimeInterval represents an interval of time. type TimeInterval struct { - Start, End time.Time + Start time.Time + End time.Time } // NewTimeInterval constructs a TimeInterval. diff --git a/cmd/tsbs_load_clickhouse/creator.go b/cmd/tsbs_load_clickhouse/creator.go new file mode 100644 index 000000000..cbbb724a4 --- /dev/null +++ b/cmd/tsbs_load_clickhouse/creator.go @@ -0,0 +1,234 @@ +package main + +import ( + "bufio" + "fmt" + "strings" + + "github.com/jmoiron/sqlx" +) + +// loader.DBCreator interface implementation +type dbCreator struct { + tags string + cols []string + connStr string +} + +// loader.DBCreator interface implementation +func (d *dbCreator) Init() { + br := loader.GetBufferedReader() + d.readDataHeader(br) +} + +// readDataHeader fills dbCreator struct with data structure (tables description) +// specified at the beginning of the data file +func (d *dbCreator) readDataHeader(br *bufio.Reader) { + // First N lines are header, with the first line containing: "tags:comma-separated list of tags" + // the second through N-1 line containing: "table name: comma-separated list of column names" + // and last line being blank to separate from the data + // + // Ex.: + //tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + //cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + //disk,total,free,used,used_percent,inodes_total,inodes_free,inodes_used + //nginx,accepts,active,handled,reading,requests,waiting,writing + + i := 0 + for { + var err error + var line string + + if i == 0 { + // read first line - list of tags + d.tags, err = br.ReadString('\n') + if err != nil { + fatal("input has wrong header format: %v", err) + } + d.tags = strings.TrimSpace(d.tags) + } else { + // read the second and further lines - metrics descriptions + line, err = br.ReadString('\n') + if err != nil { + fatal("input has wrong header format: %v", err) + } + line = strings.TrimSpace(line) + if len(line) == 0 { + // empty line - end of header + break + } + // append new table/columns set to the list of tables/columns set + d.cols = append(d.cols, line) + } + i++ + } +} + +// loader.DBCreator interface implementation +func (d *dbCreator) DBExists(dbName string) bool { + db := sqlx.MustConnect(dbType, getConnectString(false)) + defer db.Close() + + sql := "SELECT name, engine FROM system.databases WHERE name = '" + dbName + "'" + if debug > 0 { + fmt.Printf(sql) + } + var rows []struct { + Name string `db:"name"` + Engine string `db:"engine"` + } + + err := db.Select(&rows, sql) + if err != nil { + panic(err) + } + for _, row := range rows { + if row.Name == dbName { + return true + } + } + + return false +} + +// loader.DBCreator interface implementation +func (d *dbCreator) RemoveOldDB(dbName string) error { + db := sqlx.MustConnect(dbType, getConnectString(false)) + defer db.Close() + + sql := "DROP DATABASE IF EXISTS " + dbName + if _, err := db.Exec(sql); err != nil { + panic(err) + } + return nil +} + +// loader.DBCreator interface implementation +func (d *dbCreator) CreateDB(dbName string) error { + // Connecto to ClickHouse in general and CREATE DATABASE + db := sqlx.MustConnect(dbType, getConnectString(false)) + sql := "CREATE DATABASE " + dbName + _, err := db.Exec(sql) + if err != nil { + panic(err) + } + db.Close() + db = nil + + // Connect to specified database within ClickHouse + db = sqlx.MustConnect(dbType, getConnectString(true)) + defer db.Close() + + // d.tags content: + //tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + // + // Parts would contain + // 0: tags - reserved word - tags mark + // 1: + // N: actual tags + // so we'll use tags[1:] for tags specification + parts := strings.Split(strings.TrimSpace(d.tags), ",") + if parts[0] != "tags" { + return fmt.Errorf("input header in wrong format. got '%s', expected 'tags'", parts[0]) + } + createTagsTable(db, parts[1:]) + tableCols["tags"] = parts[1:] + + // d.cols content are lines (metrics descriptions) as: + //cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + //disk,total,free,used,used_percent,inodes_total,inodes_free,inodes_used + //nginx,accepts,active,handled,reading,requests,waiting,writing + for _, cols := range d.cols { + // cols content: + //cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + // Parts would contain + // 0: table name + // 1: + // N: table columns + parts = strings.Split(strings.TrimSpace(cols), ",") + + // Ex.: cpu OR disk OR nginx + tableName := parts[0] + tableCols[tableName] = parts[1:] + + partitioningField := tableCols["tags"][0] // would be 'hostname' + + pseudoCols := []string{} + if inTableTag { + pseudoCols = append(pseudoCols, partitioningField) + } + + fieldDef := []string{} + pseudoCols = append(pseudoCols, parts[1:]...) + for _, field := range pseudoCols { + if len(field) == 0 { + continue + } + fieldDef = append(fieldDef, fmt.Sprintf("%s Float64", field)) + } + + sql := fmt.Sprintf(` + CREATE TABLE %s ( + created_date Date DEFAULT today(), + created_at DateTime DEFAULT now(), + time String, + tags_id UInt32, + %s, + additional_tags String DEFAULT '' + ) ENGINE = MergeTree(created_date, (tags_id, created_at), 8192)`, + tableName, + strings.Join(fieldDef, ",")) + if debug > 0 { + fmt.Printf(sql) + } + _, err := db.Exec(sql) + if err != nil { + panic(err) + } + } + + return nil +} + +// createTagsTable builds CREATE TABLE SQL statement and runs it +func createTagsTable(db *sqlx.DB, tags []string) { + // prepare COLUMNs specification for CREATE TABLE statement + // all columns would be of type String + cols := strings.Join(tags, " String,\n ") + cols += " String\n" + + // index would be on all fields + //index := strings.Join(tags, "," ) + index := "id" + + sql := fmt.Sprintf(` + CREATE TABLE tags( + created_date Date DEFAULT today(), + created_at DateTime DEFAULT now(), + id UInt32, + %s + ) ENGINE=MergeTree(created_date, (%s), 8192) + `, + cols, + index) + if debug > 0 { + fmt.Printf(sql) + } + _, err := db.Exec(sql) + if err != nil { + panic(err) + } +} + +// getConnectString() builds connect string to ClickHouse +// db - whether database specification should be added to the connection string +func getConnectString(db bool) string { + // connectString: tcp://127.0.0.1:9000?debug=true + // ClickHouse ex.: + // tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000 + if db { + return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", host, user, password, loader.DatabaseName()) + } else { + return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s", host, user, password) + } +} diff --git a/cmd/tsbs_load_clickhouse/creator_test.go b/cmd/tsbs_load_clickhouse/creator_test.go new file mode 100644 index 000000000..6e1c1008f --- /dev/null +++ b/cmd/tsbs_load_clickhouse/creator_test.go @@ -0,0 +1,90 @@ +package main + +import ( + "bufio" + "bytes" + "log" + "testing" +) + +func TestDBCreatorReadDataHeader(t *testing.T) { + cases := []struct { + desc string + input string + wantTags string + wantCols []string + wantBuffered int + shouldFatal bool + }{ + { + desc: "min case: exactly three lines", + input: "tags,tag1,tag2\ncols,col1,col2\n\n", + wantTags: "tags,tag1,tag2", + wantCols: []string{"cols,col1,col2"}, + wantBuffered: 0, + }, + { + desc: "min case: more than the header 3 lines", + input: "tags,tag1,tag2\ncols,col1,col2\n\nrow1\nrow2\n", + wantTags: "tags,tag1,tag2", + wantCols: []string{"cols,col1,col2"}, + wantBuffered: len([]byte("row1\nrow2\n")), + }, + { + desc: "multiple tables: more than 3 lines for header", + input: "tags,tag1,tag2\ncols,col1,col2\ncols2,col21,col22\n\n", + wantTags: "tags,tag1,tag2", + wantCols: []string{"cols,col1,col2", "cols2,col21,col22"}, + wantBuffered: 0, + }, + { + desc: "multiple tables: more than 3 lines for header w/ extra", + input: "tags,tag1,tag2\ncols,col1,col2\ncols2,col21,col22\n\nrow1\nrow2\n", + wantTags: "tags,tag1,tag2", + wantCols: []string{"cols,col1,col2", "cols2,col21,col22"}, + wantBuffered: len([]byte("row1\nrow2\n")), + }, + { + desc: "too few lines", + input: "tags\ncols\n", + shouldFatal: true, + }, + { + desc: "no line ender", + input: "tags", + shouldFatal: true, + }, + } + + for _, c := range cases { + dbc := &dbCreator{} + br := bufio.NewReader(bytes.NewReader([]byte(c.input))) + if c.shouldFatal { + isCalled := false + fatal = func(fmt string, args ...interface{}) { + isCalled = true + log.Printf(fmt, args...) + } + dbc.readDataHeader(br) + if !isCalled { + t.Errorf("%s: did not call fatal when it should", c.desc) + } + } else { + dbc.readDataHeader(br) + if dbc.tags != c.wantTags { + t.Errorf("%s: incorrect tags: got\n%s\nwant\n%s", c.desc, dbc.tags, c.wantTags) + } + if len(dbc.cols) != len(c.wantCols) { + t.Errorf("%s: incorrect cols len: got %d want %d", c.desc, len(dbc.cols), len(c.wantCols)) + } + for i := range dbc.cols { + if got := dbc.cols[i]; got != c.wantCols[i] { + t.Errorf("%s: cols row %d incorrect: got\n%s\nwant\n%s\n", c.desc, i, got, c.wantCols[i]) + } + } + if br.Buffered() != c.wantBuffered { + t.Errorf("%s: incorrect amt buffered: got\n%d\nwant\n%d", c.desc, br.Buffered(), c.wantBuffered) + } + } + } +} diff --git a/cmd/tsbs_load_clickhouse/main.go b/cmd/tsbs_load_clickhouse/main.go new file mode 100644 index 000000000..653c329bc --- /dev/null +++ b/cmd/tsbs_load_clickhouse/main.go @@ -0,0 +1,115 @@ +// tsbs_load_clickhouse loads a ClickHouse instance with data from stdin. +// +// If the database exists beforehand, it will be *DROPPED*. +package main + +import ( + "bufio" + "flag" + "log" + "github.com/timescale/tsbs/load" +) + +const ( + dbType = "clickhouse" + timeValueIdx = "TIME-VALUE" + valueTimeIdx = "VALUE-TIME" +) + +// Program option vars: +var ( + host string + user string + password string + + logBatches bool + inTableTag bool + hashWorkers bool + + profileFile string + + debug int +) + +// String values of tags and fields to insert - string representation +type insertData struct { + tags string // hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + fields string // 1451606400000000000,58,2,24,61,22,63,6,44,80,38 +} + +// Global vars +var ( + loader *load.BenchmarkRunner + tableCols map[string][]string +) + +// allows for testing +var fatal = log.Fatalf + +// Parse args: +func init() { + loader = load.GetBenchmarkRunner() + + flag.StringVar(&host, "host", "localhost", "Hostname of ClickHouse instance") + flag.StringVar(&user, "user", "default", "User to connect to ClickHouse as") + flag.StringVar(&password, "password", "", "Password to connect to ClickHouse") + + flag.BoolVar(&logBatches, "log-batches", false, "Whether to time individual batches.") + + // TODO - This flag could potentially be done as a string/enum with other options besides no-hash, round-robin, etc + flag.BoolVar(&hashWorkers, "hash-workers", false, "Whether to consistently hash insert data to the same workers (i.e., the data for a particular host always goes to the same worker)") + flag.StringVar(&profileFile, "write-profile", "", "File to output CPU/memory profile to") + + flag.IntVar(&debug, "debug", 0, "Debug printing (choices: 0, 1, 2). (default 0)") + + flag.Parse() + tableCols = make(map[string][]string) +} + +// loader.Benchmark interface implementation +type benchmark struct{} + +// loader.Benchmark interface implementation +func (b *benchmark) GetPointDecoder(br *bufio.Reader) load.PointDecoder { + return &decoder{ + scanner: bufio.NewScanner(br), + } +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetBatchFactory() load.BatchFactory { + return &factory{} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetPointIndexer(maxPartitions uint) load.PointIndexer { + if hashWorkers { + return &hostnameIndexer{ + partitions: maxPartitions, + } + } + return &load.ConstantIndexer{} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetProcessor() load.Processor { + return &processor{} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetDBCreator() load.DBCreator { + return &dbCreator{} +} + +func main() { + // If specified, generate a performance profile + if len(profileFile) > 0 { + go profileCPUAndMem(profileFile) + } + + if hashWorkers { + loader.RunBenchmark(&benchmark{}, load.WorkerPerQueue) + } else { + loader.RunBenchmark(&benchmark{}, load.SingleQueue) + } +} diff --git a/cmd/tsbs_load_clickhouse/main_test.go b/cmd/tsbs_load_clickhouse/main_test.go new file mode 100644 index 000000000..9e10e5c0b --- /dev/null +++ b/cmd/tsbs_load_clickhouse/main_test.go @@ -0,0 +1,22 @@ +package main + +import ( + "fmt" + "testing" +) + +func TestGetConnectString(t *testing.T) { + wantHost := "localhost" + wantUser := "default" + wantPassword := "" + wantDB := "benchmark" + want := fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", wantHost, wantUser, wantPassword, wantDB) + + host = wantHost + user = wantUser + password = wantPassword + connStr := getConnectString(true) + if connStr != want { + t.Errorf("incorrect connect string: got %s want %s", connStr, want) + } +} diff --git a/cmd/tsbs_load_clickhouse/process.go b/cmd/tsbs_load_clickhouse/process.go new file mode 100644 index 000000000..574a67514 --- /dev/null +++ b/cmd/tsbs_load_clickhouse/process.go @@ -0,0 +1,369 @@ +package main + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/jmoiron/sqlx" + _ "github.com/kshvakov/clickhouse" + "github.com/timescale/tsbs/load" +) + +type syncCSI struct { + // Map hostname to tags.id for this host + m map[string]int64 + mutex *sync.RWMutex +} + +func newSyncCSI() *syncCSI { + return &syncCSI{ + m: make(map[string]int64), + mutex: &sync.RWMutex{}, + } +} + +// globalSyncCSI is used when data is not hashed by some function to a worker consistently so +// therefore all workers need to know about the same map from hostname -> tags_id +var globalSyncCSI = newSyncCSI() + +// subsystemTagsToJSON converts equations as +// a=b +// c=d +// into JSON STRING '{"a": "b", "c": "d"}' +func subsystemTagsToJSON(tags []string) string { + json := "{" + for i, t := range tags { + args := strings.Split(t, "=") + if i > 0 { + json += "," + } + json += fmt.Sprintf("\"%s\": \"%s\"", args[0], args[1]) + } + json += "}" + return json +} + +// insertTags fills tags table with values +func insertTags(db *sqlx.DB, startId int, tagRows [][]string, returnResults bool) map[string]int64 { + // Map hostname to tags_id + ret := make(map[string]int64) + + // reflect tags table structure which is + // CREATE TABLE tags( + // created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + // id, + // %s + // ) engine=MergeTree(created_at, (%s), 8192) + + // build insert-multiple-rows INSERT statement like: + // INSERT INTO tags ( + // ... list of column names ... + // ) VALUES + // ( ... row 1 values ... ), + // ( ... row 2 values ... ), + // ... + // ( ... row N values ... ), + + // Columns. Ex.: + // hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + cols := tableCols["tags"] + // Add id column to prepared statement + sql := fmt.Sprintf(` + INSERT INTO tags( + id,%s + ) VALUES ( + ?%s + )`, + strings.Join(cols, ","), + strings.Repeat(",?", len(cols))) + if debug > 0 { + fmt.Printf(sql) + } + + // In a single transaction insert tags row-by-row + // ClickHouse driver accumulates all rows inside a transaction into one batch + tx, err := db.Begin() + if err != nil { + panic(err) + } + stmt, err := tx.Prepare(sql) + if err != nil { + panic(err) + } + defer stmt.Close() + + id := startId + for _, tagRow := range tagRows { + // id of the bew tag + id++ + + // unfortunately, it is not possible to pass a slice into variadic function of type interface + // more details on the item: + // https://blog.learngoprogramming.com/golang-variadic-funcs-how-to-patterns-369408f19085 + // Passing a slice to variadic param with an empty-interface + // +1 is here for id + var ihellos []interface{} = make([]interface{}, len(tagRow) + 1) + // Place id at the beginning + ihellos[0] = id + // And all the rest of column values afterwards + for i, hello := range tagRow { + ihellos[i + 1] = hello + } + + // And now expand []interface{} with the same data as tagRow in Exec(args ...interface{}) + _, err := stmt.Exec(ihellos...) + if err != nil { + panic(err) + } + + // Fill map hostname -> id + if returnResults { + // Map hostname -> tags_id + ret[tagRow[0]] = int64(id) + } + } + + err = tx.Commit() + if err != nil { + panic(err) + } + + if returnResults { + return ret + } + + return nil +} + +// Process part of incoming data - insert into tables +func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { + tagRows := make([][]string, 0, len(rows)) + dataRows := make([][]interface{}, 0, len(rows)) + ret := uint64(0) + commonTagsLen := len(tableCols["tags"]) + + colLen := len(tableCols[tableName]) + 2 + if inTableTag { + colLen++ + } + + var tagsIdPosition int = 0 + + for _, data := range rows { + // Split the tags into individual common tags and + // an extra bit leftover for non-common tags that need to be added separately. + // For each of the common tags, remove everything after = in the form