Skip to content

Commit

Permalink
Add a feature in/out files with '-file /path/to/file' option
Browse files Browse the repository at this point in the history
Each tool which generates output can write to STDOUt or to a file.
Each tool which reads input can read from STDIN or from a file.
  • Loading branch information
sunsingerus authored and RobAtticus committed Oct 24, 2018
1 parent d9a3fc1 commit cd0377f
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 32 deletions.
27 changes: 24 additions & 3 deletions cmd/tsbs_generate_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
errInvalidGroupsFmt = "incorrect interleaved groups configuration: id %d >= total groups %d"
errInvalidFormatFmt = "invalid format specifier: %v (valid choices: %v)"

inputBufSize = 4 << 20
defaultWriteSize = 4 << 20 // 4 MB
)

// semi-constants
Expand Down Expand Up @@ -85,6 +85,7 @@ var (
interleavedGenerationGroups uint

logInterval time.Duration
fileName string
)

func parseTimeFromString(s string) time.Time {
Expand Down Expand Up @@ -134,6 +135,22 @@ func postFlagParse(flags parseableFlagVars) {
timestampEnd = parseTimeFromString(flags.timestampEndStr)
}

// GetBufferedWriter returns the buffered Writer that should be used for generated output
func GetBufferedWriter(fileName string) *bufio.Writer {
// Prepare output file/STDOUT
if len(fileName) > 0 {
// Write output to file
file, err := os.Create(fileName)
if err != nil {
fatal("cannot open file for write %s: %v", fileName, err)
}
return bufio.NewWriterSize(file, defaultWriteSize)
}

// Write output to STDOUT
return bufio.NewWriterSize(os.Stdout, defaultWriteSize)
}

// Parse args:
func init() {
pfv := parseableFlagVars{}
Expand All @@ -156,6 +173,8 @@ func init() {
flag.StringVar(&profileFile, "profile-file", "", "File to which to write go profiling data")

flag.DurationVar(&logInterval, "log-interval", 10*time.Second, "Duration between host data points")
flag.StringVar(&fileName, "file", "", "File name to write generated data to")

flag.Parse()

postFlagParse(pfv)
Expand All @@ -174,11 +193,13 @@ func main() {
}

rand.Seed(seed)
out := bufio.NewWriterSize(os.Stdout, inputBufSize)

// Get output writer
out := GetBufferedWriter(fileName)
defer func() {
err := out.Flush()
if err != nil {
log.Fatal(err.Error())
fatal(err.Error())
}
}()

Expand Down
54 changes: 41 additions & 13 deletions cmd/tsbs_generate_queries/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ var useCaseMatrix = map[string]map[string]utils.QueryFillerMaker{
},
}

const defaultWriteSize = 4 << 20 // 4 MB

// Program option vars:
var (
fatal = log.Fatalf

generator utils.DevopsGenerator
filler utils.QueryFiller

queryCount int
fileName string

seed int64
debug int
Expand Down Expand Up @@ -77,6 +82,22 @@ func getGenerator(format string, start, end time.Time, scale int) utils.DevopsGe
panic(fmt.Sprintf("no devops generator specified for format '%s'", format))
}

// GetBufferedWriter returns the buffered Writer that should be used for generated output
func GetBufferedWriter(fileName string) *bufio.Writer {
// Prepare output file/STDOUT
if len(fileName) > 0 {
// Write output to file
file, err := os.Create(fileName)
if err != nil {
fatal("cannot open file for write %s: %v", fileName, err)
}
return bufio.NewWriterSize(file, defaultWriteSize)
}

// Write output to STDOUT
return bufio.NewWriterSize(os.Stdout, defaultWriteSize)
}

// Parse args:
func init() {
useCaseMatrix["cpu-only"] = useCaseMatrix["devops"]
Expand Down Expand Up @@ -116,18 +137,20 @@ func init() {
flag.UintVar(&interleavedGenerationGroupID, "interleaved-generation-group-id", 0, "Group (0-indexed) to perform round-robin serialization within. Use this to scale up data generation to multiple processes.")
flag.UintVar(&interleavedGenerationGroups, "interleaved-generation-groups", 1, "The number of round-robin serialization groups. Use this to scale up data generation to multiple processes.")

flag.StringVar(&fileName, "file", "", "File name to write generated queries to")

flag.Parse()

if !(interleavedGenerationGroupID < interleavedGenerationGroups) {
log.Fatal("incorrect interleaved groups configuration")
fatal("incorrect interleaved groups configuration")
}

if _, ok := useCaseMatrix[useCase]; !ok {
log.Fatalf("invalid use case specifier: '%s'", useCase)
fatal("invalid use case specifier: '%s'", useCase)
}

if _, ok := useCaseMatrix[useCase][queryType]; !ok {
log.Fatalf("invalid query type specifier: '%s'", queryType)
fatal("invalid query type specifier: '%s'", queryType)
}

// the default seed is the current timestamp:
Expand All @@ -140,12 +163,12 @@ func init() {
var err error
timestampStart, err := time.Parse(time.RFC3339, timestampStartStr)
if err != nil {
log.Fatal(err)
fatal(err.Error())
}
timestampStart = timestampStart.UTC()
timestampEnd, err := time.Parse(time.RFC3339, timestampEndStr)
if err != nil {
log.Fatal(err)
fatal(err.Error())
}
timestampEnd = timestampEnd.UTC()

Expand All @@ -159,9 +182,14 @@ func main() {
// Set up bookkeeping:
stats := make(map[string]int64)

// Set up output buffering:
out := bufio.NewWriter(os.Stdout)
defer out.Flush()
// Get output writer
out := GetBufferedWriter(fileName)
defer func() {
err := out.Flush()
if err != nil {
fatal(err.Error())
}
}()

// Create request instances, serializing them to stdout and collecting
// counts for each kind. If applicable, only prints queries that
Expand All @@ -176,24 +204,24 @@ func main() {
if currentInterleavedGroup == interleavedGenerationGroupID {
err := enc.Encode(q)
if err != nil {
log.Fatal("encoder ", err)
fatal("encoder %v", err)
}
stats[string(q.HumanLabelName())]++

if debug == 1 {
_, err := fmt.Fprintf(os.Stderr, "%s\n", q.HumanLabelName())
if err != nil {
log.Fatal(err)
fatal(err.Error())
}
} else if debug == 2 {
_, err := fmt.Fprintf(os.Stderr, "%s\n", q.HumanDescriptionName())
if err != nil {
log.Fatal(err)
fatal(err.Error())
}
} else if debug >= 3 {
_, err := fmt.Fprintf(os.Stderr, "%s\n", q.String())
if err != nil {
log.Fatal(err)
fatal(err.Error())
}
}
}
Expand All @@ -214,7 +242,7 @@ func main() {
for _, k := range keys {
_, err := fmt.Fprintf(os.Stderr, "%s: %d points\n", k, stats[k])
if err != nil {
log.Fatal(err)
fatal(err.Error())
}
}
}
8 changes: 4 additions & 4 deletions cmd/tsbs_run_queries_timescaledb/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// tsbs_run_queries_timescaledb speed tests TimescaleDB using requests from stdin.
// tsbs_run_queries_timescaledb speed tests TimescaleDB using requests from stdin or file
//
// It reads encoded Query objects from stdin, and makes concurrent requests
// to the provided PostgreSQL/TimescaleDB endpoint. This program has no knowledge of the
// internals of the endpoint.
// It reads encoded Query objects from stdin or file, and makes concurrent requests
// to the provided PostgreSQL/TimescaleDB endpoint.
// This program has no knowledge of the internals of the endpoint.
package main

import (
Expand Down
20 changes: 16 additions & 4 deletions load/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"sync/atomic"
"time"
"log"
)

const (
Expand All @@ -25,7 +26,10 @@ const (
)

// change for more useful testing
var printFn = fmt.Printf
var (
printFn = fmt.Printf
fatal = log.Fatalf
)

// Benchmark is an interface that represents the skeleton of a program
// needed to run an insert or load benchmark.
Expand Down Expand Up @@ -53,7 +57,7 @@ type BenchmarkRunner struct {
doCreateDB bool
doAbortOnExist bool
reportingPeriod time.Duration
filename string // TODO implement file reading
fileName string

// non-flag fields
br *bufio.Reader
Expand Down Expand Up @@ -81,6 +85,7 @@ func GetBenchmarkRunnerWithBatchSize(batchSize uint) *BenchmarkRunner {
flag.BoolVar(&loader.doCreateDB, "do-create-db", true, "Whether to create the database. Disable on all but one client if running on a multi client setup.")
flag.BoolVar(&loader.doAbortOnExist, "do-abort-on-exist", false, "Whether to abort if a database with the given name already exists.")
flag.DurationVar(&loader.reportingPeriod, "reporting-period", 10*time.Second, "Period to report write stats")
flag.StringVar(&loader.fileName, "file", "", "File name to read data from")

return loader
}
Expand Down Expand Up @@ -120,9 +125,16 @@ func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint) {
// GetBufferedReader returns the buffered Reader that should be used by the loader
func (l *BenchmarkRunner) GetBufferedReader() *bufio.Reader {
if l.br == nil {
if len(l.filename) > 0 {
l.br = nil // TODO - Support reading from files
if len(l.fileName) > 0 {
// Read from specified file
file, err := os.Open(l.fileName)
if err != nil {
fatal("cannot open file for read %s: %v", l.fileName, err)
return nil
}
l.br = bufio.NewReaderSize(file, defaultReadSize)
} else {
// Read from STDIN
l.br = bufio.NewReaderSize(os.Stdin, defaultReadSize)
}
}
Expand Down
38 changes: 32 additions & 6 deletions load/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,50 @@ func TestGetBufferedReader(t *testing.T) {
if br != nil {
t.Errorf("initial buffered reader is non-nil")
}
// TODO Filename not yet supported
r.filename = "foo"

oldFatal := fatal
fatalCalled := false
fatal = func(format string, args ...interface{}) {
fatalCalled = true
}

// Should give a nil bufio.Reader
fatalCalled = false
r.fileName = "foo"
br = r.GetBufferedReader()
if br != nil {
t.Errorf("filename returned a non-nil buffered reader")
t.Errorf("filename returned not nil buffered reader for unexistent file")
}

if !fatalCalled {
t.Errorf("fatal not called when it should have been")
}

// Should give a non-nil bufio.Reader
fatalCalled = false
r.fileName = "/dev/null"
br = r.GetBufferedReader()
if br == nil {
t.Errorf("filename returned nil buffered reader for /dev/null")
}
// Should give a non-nil bufio.Reader now
r.filename = ""

// Should give a non-nil bufio.Reader
fatalCalled = false
r.fileName = ""
br = r.GetBufferedReader()
if br == nil {
t.Errorf("non-filename returned a nil buffered reader")
t.Errorf("STDOUT returned a nil buffered reader")
}

// Test that it returns same bufio.Reader as before
fatalCalled = false
old := br
br = r.GetBufferedReader()
if br != old {
t.Errorf("different buffered reader returned after previously set")
}

fatal = oldFatal
}

func TestUseDBCreator(t *testing.T) {
Expand Down
27 changes: 25 additions & 2 deletions query/benchmarker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
labelAllQueries = "all queries"
labelColdQueries = "cold queries"
labelWarmQueries = "warm queries"

defaultReadSize = 4 << 20 // 4 MB
)

// BenchmarkRunner contains the common components for running a query benchmarking
Expand All @@ -30,6 +32,9 @@ type BenchmarkRunner struct {
memProfile string
printResponses bool
debug int
fileName string

br *bufio.Reader
}

// NewBenchmarkRunner creates a new instance of BenchmarkRunner which is
Expand All @@ -49,6 +54,7 @@ func NewBenchmarkRunner() *BenchmarkRunner {
flag.BoolVar(&ret.sp.prewarmQueries, "prewarm-queries", false, "Run each query twice in a row so the warm query is guaranteed to be a cache hit")
flag.BoolVar(&ret.printResponses, "print-responses", false, "Pretty print response bodies for correctness checking (default false).")
flag.IntVar(&ret.debug, "debug", 0, "Whether to print debug messages.")
flag.StringVar(&ret.fileName, "file", "", "File name to read queries from")

return ret
}
Expand Down Expand Up @@ -84,6 +90,24 @@ type Processor interface {
ProcessQuery(q Query, isWarm bool) ([]*Stat, error)
}

// GetBufferedReader returns the buffered Reader that should be used by the loader
func (b *BenchmarkRunner) GetBufferedReader() *bufio.Reader {
if b.br == nil {
if len(b.fileName) > 0 {
// Read from specified file
file, err := os.Open(b.fileName)
if err != nil {
panic(fmt.Sprintf("cannot open file for read %s: %v", b.fileName, err))
}
b.br = bufio.NewReaderSize(file, defaultReadSize)
} else {
// Read from STDIN
b.br = bufio.NewReaderSize(os.Stdin, defaultReadSize)
}
}
return b.br
}

// Run does the bulk of the benchmark execution. It launches a gorountine to track
// stats, creates workers to process queries, read in the input, execute the queries,
// and then does cleanup.
Expand All @@ -107,9 +131,8 @@ func (b *BenchmarkRunner) Run(queryPool *sync.Pool, createFn ProcessorCreate) {
}

// Read in jobs, closing the job channel when done:
input := bufio.NewReaderSize(os.Stdin, 1<<20)
wallStart := time.Now()
b.scanner.setReader(input).scan(queryPool, b.c)
b.scanner.setReader(b.GetBufferedReader()).scan(queryPool, b.c)
close(b.c)

// Block for workers to finish sending requests, closing the stats
Expand Down

0 comments on commit cd0377f

Please sign in to comment.