Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add VictoriaMetrics support #96

Merged
merged 15 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Current databases supported:
+ MongoDB [(supplemental docs)](docs/mongo.md)
+ SiriDB [(supplemental docs)](docs/siridb.md)
+ TimescaleDB [(supplemental docs)](docs/timescaledb.md)
+ VictoriaMetrics [(supplemental docs)](docs/victoriametrics.md)

## Overview

Expand Down Expand Up @@ -75,8 +76,10 @@ cases are implemented for each database:
|MongoDB|X|
|SiriDB|X|
|TimescaleDB|X|X|
|VictoriaMetrics|X²||

¹ Does not support the `groupby-orderby-limit` query
² Does not support the `groupby-orderby-limit`, `lastpoint`, `high-cpu-1`, `high-cpu-all` queries

## What the TSBS tests

Expand Down Expand Up @@ -140,7 +143,7 @@ Variables needed:
1. how much time should be between each reading per device, in seconds. E.g., `10s`
1. and which database(s) you want to generate for. E.g., `timescaledb`
(choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `siridb`,
or `timescaledb`)
`timescaledb` or `victoriametrics`)

Given the above steps you can now generate a dataset (or multiple
datasets, if you chose to generate for multiple databases) that can
Expand Down
1 change: 1 addition & 0 deletions cmd/tsbs_generate_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// InfluxDB bulk load format
// MongoDB BSON format
// TimescaleDB pseudo-CSV format (the same as for ClickHouse)
// VictoriaMetrics bulk load format (the same as for InfluxDB)

// Supported use cases:
// devops: scale is the number of hosts to simulate, with log messages
Expand Down
63 changes: 63 additions & 0 deletions cmd/tsbs_generate_queries/databases/victoriametrics/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package victoriametrics

import (
"fmt"
"net/url"
"strconv"
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
iutils "github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/query"
)

type BaseGenerator struct{}

// GenerateEmptyQuery returns an empty query.HTTP.
func (g *BaseGenerator) GenerateEmptyQuery() query.Query {
return query.NewHTTP()
}

// NewDevops creates a new devops use case query generator.
func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) {
core, err := devops.NewCore(start, end, scale)
if err != nil {
return nil, err
}
return &Devops{
BaseGenerator: g,
Core: core,
}, nil
}

type queryInfo struct {
// prometheus query
query string
// label to describe type of query
label string
// desc to describe type of query
desc string
// time range for query executing
interval *iutils.TimeInterval
// time period to group by in seconds
step string
}

// fill Query fills the query struct with data
func (g *BaseGenerator) fillInQuery(qq query.Query, qi *queryInfo) {
q := qq.(*query.HTTP)
q.HumanLabel = []byte(qi.label)
if qi.interval != nil {
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", qi.label, qi.interval.StartString()))
}
q.Method = []byte("GET")

v := url.Values{}
v.Set("query", qi.query)
v.Set("start", strconv.FormatInt(qi.interval.StartUnixNano()/1e9, 10))
v.Set("end", strconv.FormatInt(qi.interval.EndUnixNano()/1e9, 10))
v.Set("step", qi.step)
q.Path = []byte(fmt.Sprintf("/api/v1/query_range?%s", v.Encode()))
q.Body = nil
}
141 changes: 141 additions & 0 deletions cmd/tsbs_generate_queries/databases/victoriametrics/devops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package victoriametrics

import (
"fmt"
"strings"
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/query"
)

// Devops produces PromQL queries for all the devops query types.
type Devops struct {
*BaseGenerator
*devops.Core
}

// mustGetRandomHosts is the form of GetRandomHosts that cannot error; if it does error,
// it causes a panic.
func (d *Devops) mustGetRandomHosts(nHosts int) []string {
hosts, err := d.GetRandomHosts(nHosts)
if err != nil {
panic(err.Error())
}
return hosts
}

func (d *Devops) GroupByOrderByLimit(qi query.Query) {
panic("GroupByOrderByLimit not supported in PromQL")
}

func (d *Devops) LastPointPerHost(qq query.Query) {
panic("LastPointPerHost not supported in PromQL")
}

func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
panic("HighCPUForHosts not supported in PromQL")
}

// GroupByTime selects the MAX for numMetrics metrics under 'cpu'
// per minute for nhosts hosts,
// e.g. in pseudo-PromQL:
// max(
// max_over_time(
// {__name__=~"metric1|metric2...|metricN",hostname=~"hostname1|hostname2...|hostnameN"}[1m]
// )
// ) by (__name__)
func (d *Devops) GroupByTime(qq query.Query, nHosts, numMetrics int, timeRange time.Duration) {
metrics := mustGetCPUMetricsSlice(numMetrics)
hosts := d.mustGetRandomHosts(nHosts)
selectClause := getSelectClause(metrics, hosts)
qi := &queryInfo{
query: fmt.Sprintf("max(max_over_time(%s[1m])) by (__name__)", selectClause),
label: fmt.Sprintf("VictoriaMetrics %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange),
interval: d.Interval.MustRandWindow(timeRange),
step: "60",
}
d.fillInQuery(qq, qi)
}

// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day,
// e.g. in pseudo-PromQL:
//
// avg(
// avg_over_time(
// {__name__=~"metric1|metric2...|metricN"}[1h]
// )
// ) by (__name__, hostname)
//
// Resultsets:
// double-groupby-1
// double-groupby-5
// double-groupby-all
func (d *Devops) GroupByTimeAndPrimaryTag(qq query.Query, numMetrics int) {
metrics := mustGetCPUMetricsSlice(numMetrics)
selectClause := getSelectClause(metrics, nil)
qi := &queryInfo{
query: fmt.Sprintf("avg(avg_over_time(%s[1h])) by (__name__, hostname)", selectClause),
label: devops.GetDoubleGroupByLabel("VictoriaMetrics", numMetrics),
interval: d.Interval.MustRandWindow(devops.DoubleGroupByDuration),
step: "3600",
}
d.fillInQuery(qq, qi)
}

// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts,
// e.g. in pseudo-PromQL:
//
// max(
// max_over_time(
// {hostname=~"hostname1|hostname2...|hostnameN"}[1h]
// )
// ) by (__name__)
func (d *Devops) MaxAllCPU(qq query.Query, nHosts int) {
hosts := d.mustGetRandomHosts(nHosts)
selectClause := getSelectClause(devops.GetAllCPUMetrics(), hosts)
qi := &queryInfo{
query: fmt.Sprintf("max(max_over_time(%s[1h])) by (__name__)", selectClause),
label: devops.GetMaxAllLabel("VictoriaMetrics", nHosts),
interval: d.Interval.MustRandWindow(devops.MaxAllDuration),
step: "3600",
}
d.fillInQuery(qq, qi)
}

func getHostClause(hostnames []string) string {
if len(hostnames) == 0 {
return ""
}
if len(hostnames) == 1 {
return fmt.Sprintf("hostname='%s'", hostnames[0])
}
return fmt.Sprintf("hostname=~'%s'", strings.Join(hostnames, "|"))
}

func getSelectClause(metrics, hosts []string) string {
if len(metrics) == 0 {
panic("BUG: must be at least one metric name in clause")
}

hostsClause := getHostClause(hosts)
if len(metrics) == 1 {
return fmt.Sprintf("cpu_%s{%s}", metrics[0], hostsClause)
}

metricsClause := strings.Join(metrics, "|")
if len(hosts) > 0 {
return fmt.Sprintf("{__name__=~'cpu_(%s)', %s}", metricsClause, hostsClause)
}
return fmt.Sprintf("{__name__=~'cpu_(%s)'}", metricsClause)
}

// mustGetCPUMetricsSlice is the form of GetCPUMetricsSlice that cannot error; if it does error,
// it causes a panic.
func mustGetCPUMetricsSlice(numMetrics int) []string {
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
if err != nil {
panic(err.Error())
}
return metrics
}
130 changes: 130 additions & 0 deletions cmd/tsbs_generate_queries/databases/victoriametrics/devops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package victoriametrics

import (
"math/rand"
"net/http"
"net/url"
"testing"
"time"

"github.com/timescale/tsbs/query"
)

func Test_what(t *testing.T) {
testCases := map[string]struct {
fn func(g *Devops, q *query.HTTP)
expQuery string
expStep string
expToFail bool
}{
"GroupByTime_1_1": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByTime(q, 1, 1, time.Hour)
},
expQuery: "max(max_over_time(cpu_usage_user{hostname='host_5'}[1m])) by (__name__)",
expStep: "60",
},
"GroupByTime_5_1": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByTime(q, 5, 1, time.Hour)
},
expQuery: "max(max_over_time(cpu_usage_user{hostname=~'host_5|host_9|host_3|host_1|host_7'}[1m])) by (__name__)",
expStep: "60",
},
"GroupByTime_5_5": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByTime(q, 5, 5, time.Hour)
},
expQuery: "max(max_over_time({__name__=~'cpu_(usage_user|usage_system|usage_idle|usage_nice|usage_iowait)', hostname=~'host_5|host_9|host_3|host_1|host_7'}[1m])) by (__name__)",
expStep: "60",
},
"GroupByTimeAndPrimaryTag": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByTimeAndPrimaryTag(q, 5)
},
expQuery: "avg(avg_over_time({__name__=~'cpu_(usage_user|usage_system|usage_idle|usage_nice|usage_iowait)'}[1h])) by (__name__, hostname)",
expStep: "3600",
},
"MaxAllCPU": {
fn: func(g *Devops, q *query.HTTP) {
g.MaxAllCPU(q, 5)
},
expQuery: "max(max_over_time({__name__=~'cpu_(usage_user|usage_system|usage_idle|usage_nice|usage_iowait|usage_irq|usage_softirq|usage_steal|usage_guest|usage_guest_nice)', hostname=~'host_5|host_9|host_3|host_1|host_7'}[1h])) by (__name__)",
expStep: "3600",
},
"GroupByOrderByLimit": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByOrderByLimit(q)
},
expToFail: true,
},
"LastPointPerHost": {
fn: func(g *Devops, q *query.HTTP) {
g.LastPointPerHost(q)
},
expToFail: true,
},
"HighCPUForHosts": {
fn: func(g *Devops, q *query.HTTP) {
g.HighCPUForHosts(q, 6)
},
expToFail: true,
},
"GroupByTime_negative_metrics": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByTime(q, 1, -1, time.Hour)
},
expToFail: true,
},
"GroupByTime_negative_hosts": {
fn: func(g *Devops, q *query.HTTP) {
g.GroupByTime(q, -1, 1, time.Hour)
},
expToFail: true,
},
}
g := acquireGenerator(t, time.Hour*24, 10)
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
rand.Seed(123) // Setting seed for testing purposes.
q := g.GenerateEmptyQuery().(*query.HTTP)
if tc.expToFail {
func() {
defer func() {
if recover() == nil {
t.Errorf("expected to panice")
}
}()
tc.fn(g, q)
}()
return
}

tc.fn(g, q)
vals, err := url.ParseQuery(string(q.Path))
if err != nil {
t.Fatalf("unexpected err while parsing query: %s", err)
}
checkEqual(t, "query", tc.expQuery, vals.Get("query"))
checkEqual(t, "step", tc.expStep, vals.Get("step"))
checkEqual(t, "method", http.MethodGet, string(q.Method))
})
}
}

func checkEqual(t *testing.T, name, a, b string) {
if a != b {
t.Fatalf("values for %q are not equal \na: %q \nb: %q", name, a, b)
}
}

func acquireGenerator(t *testing.T, interval time.Duration, scale int) *Devops {
b := &BaseGenerator{}
s := time.Unix(0, 0)
e := s.Add(interval)
g, err := b.NewDevops(s, e, scale)
if err != nil {
t.Fatalf("Error while creating devops generator")
}
return g.(*Devops)
}
12 changes: 12 additions & 0 deletions cmd/tsbs_load_victoriametrics/creator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

// VictoriaMetrics don't have a database abstraction
type dbCreator struct{}

func (d *dbCreator) Init() {}

func (d *dbCreator) DBExists(dbName string) bool { return true }

func (d *dbCreator) CreateDB(dbName string) error { return nil }

func (d *dbCreator) RemoveOldDB(dbName string) error { return nil }
Loading