Skip to content

Commit

Permalink
[bench] Xephon-K loader is working #12
Browse files Browse the repository at this point in the history
- simply add a serializer struct and it goes on /w/, didn't meet
any issue with http client yet, but I think the idle connect may
need to be modified if I increase the number of workers
  • Loading branch information
at15 committed Mar 18, 2017
1 parent 22a4822 commit 9a81c05
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 4 deletions.
7 changes: 7 additions & 0 deletions pkg/bench/loader/http_loader.go
Expand Up @@ -28,6 +28,13 @@ func (loader *HTTPLoader) Run() {
return
}
baseReq = req
case bench.DBXephonK:
req, err := http.NewRequest("POST", "http://localhost:8080/write", nil)
if err != nil {
log.Panic(err)
return
}
baseReq = req
default:
log.Panic("unsupported database, no base request avaliable")
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/bench/loader/http_worker.go
Expand Up @@ -35,6 +35,8 @@ func (worker *HTTPWorker) work() {
switch worker.config.TargetDB {
case bench.DBInfluxDB:
serializer = &serialize.InfluxDBSerialize{}
case bench.DBXephonK:
serializer = &serialize.XephonKSerialize{}
default:
log.Panic("unsupported database, not serailizer avaliable")
return
Expand Down
4 changes: 3 additions & 1 deletion pkg/bench/loader/loader_test.go
Expand Up @@ -2,6 +2,7 @@ package loader

import (
"context"
"github.com/xephonhq/xephon-k/pkg/bench"
"testing"
"time"
)
Expand All @@ -10,7 +11,8 @@ func TestHTTPLoader_Run(t *testing.T) {
if testing.Short() {
t.Skip("skip loader run test")
}
ld := NewHTTPLoader(NewConfig())
//ld := NewHTTPLoader(NewConfig(bench.DBInfluxDB))
ld := NewHTTPLoader(NewConfig(bench.DBXephonK))
ld.Run()
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/bench/loader/pkg.go
@@ -1,7 +1,6 @@
package loader

import (
"github.com/xephonhq/xephon-k/pkg/bench"
"github.com/xephonhq/xephon-k/pkg/util"
"time"
)
Expand All @@ -20,15 +19,15 @@ type Config struct {
TargetDB int
}

func NewConfig() Config {
func NewConfig(targetDB int) Config {
return Config{
//Duration: time.Duration(1) * time.Minute,
Duration: time.Duration(5) * time.Second,
BatchSize: 100,
WorkerNum: 10,
QPS: 0,
Timeout: time.Duration(30) * time.Second,
TargetDB: bench.DBInfluxDB,
TargetDB: targetDB,
}
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/bench/serialize/xephonk.go
@@ -0,0 +1,23 @@
package serialize

import (
"bytes"
"encoding/json"

"github.com/xephonhq/xephon-k/pkg/common"
)

type XephonKSerialize struct {
}

// WriteInt implements Serializer
func (xk *XephonKSerialize) WriteInt(series common.IntSeries) []byte {
buf := bytes.NewBufferString("[")
j, err := json.Marshal(series)
if err != nil {
log.Panicf("can't serialize to json: %s", err.Error())
}
buf.Write(j)
buf.WriteString("]")
return buf.Bytes()
}
8 changes: 8 additions & 0 deletions pkg/bench/serialize/xephonk_test.go
@@ -0,0 +1,8 @@
package serialize

import "testing"

func TestXephonKSerialize_WriteInt(t *testing.T) {
xks := XephonKSerialize{}
log.Info(string(xks.WriteInt(createDummyIntPoints())))
}

0 comments on commit 9a81c05

Please sign in to comment.