Skip to content

Commit

Permalink
Use ReadString for wide events, add options to provide cpu profile
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Nov 26, 2018
1 parent a8c1be1 commit 40345bf
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions tools/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"os"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/xichen2020/eventdb/sharding"
"github.com/xichen2020/eventdb/storage"
"github.com/xichen2020/eventdb/value"
"github.com/xichen2020/eventdb/x/unsafe"

"github.com/m3db/m3cluster/shard"
"github.com/m3db/m3x/log"
Expand All @@ -29,6 +29,7 @@ var (
excludeKeySuffix = flag.String("excludeKeySuffix", "", "excluding keys with given suffix")
numShards = flag.Int("numShards", 8, "number of shards")
numWorkers = flag.Int("numWorkers", 1, "number of workers processing events in parallel")
cpuProfileFile = flag.String("cpuProfileFile", "cpu.profile", "path to CPU profile")

logger = log.SimpleLogger
eventNamespace = []byte("testNamespace")
Expand All @@ -52,11 +53,22 @@ func main() {
if err != nil {
logger.Fatalf("error creating database: %v", err)
}
if err := db.Open(); err != nil {
if err = db.Open(); err != nil {
logger.Fatalf("error opening database: %v", err)
}
defer db.Close()

if len(*cpuProfileFile) > 0 {
f, err := os.Create(*cpuProfileFile)
if err != nil {
logger.Fatalf("could not create CPU profile: %v", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
logger.Fatalf("could not start CPU profile: %v", err)
}
defer pprof.StopCPUProfile()
}

var (
wg sync.WaitGroup
idx int32 = -1
Expand Down Expand Up @@ -88,33 +100,39 @@ func readEvents(fname string) ([]event.Event, int, error) {

var (
parserOpts = parserOptions()
scanner = bufio.NewScanner(f)
reader = bufio.NewReader(f)
events []event.Event
totalBytes int
parseTime time.Duration
readErr error
)
for scanner.Scan() {
eventStr := scanner.Text()
totalBytes += len(eventStr)
for readErr == nil {
var eventBytes []byte
eventBytes, readErr = reader.ReadBytes('\n')
size := len(eventBytes)
if size == 0 {
continue
}
if eventBytes[size-1] == '\n' {
eventBytes = eventBytes[:size-1]
}
totalBytes += len(eventBytes)
parseStart := time.Now()
p := json.NewParser(parserOpts)
v, err := p.Parse(eventStr)
v, err := p.ParseBytes(eventBytes)
parseTime += time.Since(parseStart)
if err != nil {
return nil, 0, fmt.Errorf("error parsing %s: %v", eventStr, err)
return nil, 0, fmt.Errorf("error parsing %s: %v", eventBytes, err)
}
ev := event.Event{
ID: []byte(uuid.NewUUID().String()),
TimeNanos: time.Now().UnixNano(),
FieldIter: value.NewFieldIterator(v),
RawData: unsafe.ToBytes(eventStr),
RawData: eventBytes,
}
events = append(events, ev)
}

if err := scanner.Err(); err != nil {
return nil, 0, fmt.Errorf("error scanning %s: %v", fname, err)
}
logger.Infof("parsing %d events in %v, throughput = %f events / s", len(events), parseTime, float64(len(events))/float64(parseTime)*1e9)
logger.Infof("parsing %d bytes in %v, throughput = %f bytes / s", totalBytes, parseTime, float64(totalBytes)/float64(parseTime)*1e9)
return events, totalBytes, nil
Expand Down

0 comments on commit 40345bf

Please sign in to comment.