forked from timescale/tsbs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
152 lines (129 loc) · 3.77 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// tsbs_run_queries_cassandra speed tests Cassandra servers using request
// data from stdin.
//
// It reads encoded HLQuery objects from stdin, and makes concurrent requests
// to the provided Cassandra cluster. This program is a 'heavy client', i.e.
// it builds a client-side index of table metadata before beginning the
// benchmarking.
package main
import (
"fmt"
"log"
"time"
"github.com/blagojts/viper"
"github.com/gocql/gocql"
"github.com/spf13/pflag"
"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/pkg/query"
)
const (
BucketDuration = 24 * time.Hour
BucketTimeLayout = "2006-01-02"
)
// Blessed tables that hold benchmark data:
var (
BlessedTables = []string{
"series_bigint",
"series_float",
"series_double",
"series_boolean",
"series_blob",
}
)
// Program option vars:
var (
daemonURL string
aggrPlanLabel string
requestTimeout time.Duration
csiTimeout time.Duration
)
// Helpers for choice-like flags:
var (
aggrPlanChoices = map[string]int{
"server": AggrPlanTypeWithServerAggregation,
"client": AggrPlanTypeWithoutServerAggregation,
}
)
// Global vars:
var (
runner *query.BenchmarkRunner
aggrPlan int
csi *ClientSideIndex
session *gocql.Session
)
// Parse args:
func init() {
var config query.BenchmarkRunnerConfig
config.AddToFlagSet(pflag.CommandLine)
pflag.String("host", "localhost:9042", "Cassandra hostname and port combination.")
pflag.String("aggregation-plan", "", "Aggregation plan (choices: server, client)")
pflag.Duration("read-timeout", 1*time.Second, "Maximum request timeout.")
pflag.Duration("client-side-index-timeout", 10*time.Second, "Maximum client-side index timeout (only used at initialization).")
pflag.Parse()
err := utils.SetupConfigFile()
if err != nil {
panic(fmt.Errorf("fatal error config file: %s", err))
}
if err := viper.Unmarshal(&config); err != nil {
panic(fmt.Errorf("unable to decode config: %s", err))
}
daemonURL = viper.GetString("host")
aggrPlanLabel = viper.GetString("aggregation-plan")
requestTimeout = viper.GetDuration("read-timeout")
csiTimeout = viper.GetDuration("client-side-index-timeout")
if _, ok := aggrPlanChoices[aggrPlanLabel]; !ok {
log.Fatal("invalid aggregation plan")
}
aggrPlan = aggrPlanChoices[aggrPlanLabel]
runner = query.NewBenchmarkRunner(config)
}
func main() {
// Make client-side index:
session = NewCassandraSession(daemonURL, runner.DatabaseName(), csiTimeout)
csi = NewClientSideIndex(FetchSeriesCollection(session))
session.Close()
// Make database connection pool:
session = NewCassandraSession(daemonURL, runner.DatabaseName(), requestTimeout)
defer session.Close()
runner.Run(&query.CassandraPool, newProcessor)
}
type processor struct {
qe *HLQueryExecutor
opts *HLQueryExecutorDoOptions
}
func newProcessor() query.Processor { return &processor{} }
func (p *processor) Init(workerNumber int) {
p.opts = &HLQueryExecutorDoOptions{
AggregationPlan: aggrPlan,
Debug: runner.DebugLevel(),
PrettyPrintResponses: runner.DoPrintResponses(),
}
p.qe = NewHLQueryExecutor(session, csi, runner.DebugLevel())
}
func (p *processor) ProcessQuery(q query.Query, isWarm bool) ([]*query.Stat, error) {
cq := q.(*query.Cassandra)
hlq := &HLQuery{*cq}
hlq.ForceUTC()
labels := [][]byte{
q.HumanLabelName(),
append(q.HumanLabelName(), "-qp"...),
append(q.HumanLabelName(), "-req"...),
}
if isWarm {
for i, l := range labels {
labels[i] = append(l, " (warm)"...)
}
}
qpLagMs, reqLagMs, err := p.qe.Do(hlq, *p.opts)
if err != nil {
return nil, err
}
// total stat
totalMs := qpLagMs + reqLagMs
stats := []*query.Stat{
query.GetPartialStat().Init(labels[1], qpLagMs),
query.GetPartialStat().Init(labels[2], reqLagMs),
query.GetStat().Init(labels[0], totalMs),
}
return stats, nil
}