Skip to content

Commit

Permalink
Merge pull request #35 from HarrisChu/enhance_reader_strategy
Browse files Browse the repository at this point in the history
change csv reader concurrency
  • Loading branch information
HarrisChu committed Nov 1, 2022
2 parents 4ccf72b + a18ebfb commit e2abed0
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 76 deletions.
6 changes: 3 additions & 3 deletions example/nebula-test-insert-limit-rate.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
// 1. each second, 1000 iterations would be made.
// 2. max concurrent vu is 300.
// 3. last 30 seconds, so it would run 1000*30 = 30000 iterations.
// 4. batchSize is 1, so it would insert one recond per iteration.
// 4. batchSize is 1, so it would insert one record per iteration.
import nebulaPool from 'k6/x/nebulagraph';
import { check } from 'k6';
import { Trend } from 'k6/metrics';
import { sleep } from 'k6';

var lantencyTrend = new Trend('latency');
var latencyTrend = new Trend('latency');
var responseTrend = new Trend('responseTime');
// initial nebula connect pool
var pool = nebulaPool.initWithSize("192.168.8.61:9669,192.168.8.62:9669,192.168.8.63:9669", 400, 4000);
Expand Down Expand Up @@ -66,7 +66,7 @@ export default function (data) {
"IsSucceed": (r) => r.isSucceed() === true
});
// add trend
lantencyTrend.add(response.getLatency());
latencyTrend.add(response.getLatency());
responseTrend.add(response.getResponseTime());

};
Expand Down
73 changes: 38 additions & 35 deletions example/nebula-test-insert.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { check } from 'k6';
import { Trend } from 'k6/metrics';
import { sleep } from 'k6';

var lantencyTrend = new Trend('latency');
var responseTrend = new Trend('responseTime');
var latencyTrend = new Trend('latency', true);
var responseTrend = new Trend('responseTime', true);
var rowSize = new Trend('rowSize');
// initial nebula connect pool
var pool = nebulaPool.initWithSize("192.168.8.61:9669,192.168.8.62:9669,192.168.8.63:9669", 400, 4000);

Expand All @@ -22,44 +23,46 @@ session.execute("USE ldbc")
// ],
// };

String.prototype.format = function () {
var formatted = this;
var data = arguments[0]

formatted = formatted.replace(/\{(\d+)\}/g, function (match, key) {
return data[key]
})
return formatted
};

export function setup() {
// config csv file
pool.configCSV("person.csv", "|", false)
// config output file, save every query information
pool.configOutput("output.csv")
sleep(1)
// config csv file
pool.configCSV("person.csv", "|", false)
// config output file, save every query information
pool.configOutput("output.csv")
sleep(1)
}

export default function (data) {
// get csv data from csv file
let ngql = 'INSERT VERTEX Person(firstName, lastName, gender, birthday, creationDate, locationIP, browserUsed) VALUES '
let batches = []
let batchSize = 100
// batch size 100
for (let i = 0; i < batchSize; i++) {
let d = session.getData();
let values = []
// concat the insert value
for (let index = 1; index < 8; index++) {
let value = '"' + d[index] + '"'
values.push(value)
}
let batch = d[0] + ":(" + values.join(",") + ")"
batches.push(batch)
}
ngql = ngql + batches.join(',')
let response = session.execute(ngql)
check(response, {
"IsSucceed": (r) => r.isSucceed() === true
});
// add trend
lantencyTrend.add(response.getLatency());
responseTrend.add(response.getResponseTime());

// get csv data from csv file
let ngql = 'INSERT VERTEX Person(firstName, lastName, gender, birthday, creationDate, locationIP, browserUsed) VALUES '
let batches = []
let batchSize = 100
// batch size 100
for (let i = 0; i < batchSize; i++) {
let d = session.getData();
let value = "{0}:(\"{1}\",\"{2}\", \"{3}\", \"{4}\", datetime(\"{5}\"), \"{6}\", \"{7}\")".format(d)
batches.push(value)
}
ngql = ngql + " " + batches.join(',')
let response = session.execute(ngql)
check(response, {
"IsSucceed": (r) => r.isSucceed() === true
});
// add trend
latencyTrend.add(response.getLatency() / 1000);
responseTrend.add(response.getResponseTime() / 1000);
rowSize.add(response.getRowSize());
};

export function teardown() {
pool.close()
pool.close()
}


4 changes: 2 additions & 2 deletions example/nebula-test-ssl.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { check } from 'k6';
import { Trend } from 'k6/metrics';
import { sleep } from 'k6';

var lantencyTrend = new Trend('latency');
var latencyTrend = new Trend('latency');
var responseTrend = new Trend('responseTime');
// initial nebula connect pool
nebulaPool.newSSLConfig("cert/test.ca.pem", "cert/test.derive.crt", "cert/test.derive.key")
Expand Down Expand Up @@ -37,7 +37,7 @@ export default function (data) {
"IsSucceed": (r) => r.isSucceed() === true
});
// add trend
lantencyTrend.add(response.getLatency());
latencyTrend.add(response.getLatency());
responseTrend.add(response.getResponseTime());

};
Expand Down
4 changes: 2 additions & 2 deletions example/nebula-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { check } from 'k6';
import { Trend } from 'k6/metrics';
import { sleep } from 'k6';

var lantencyTrend = new Trend('latency');
var latencyTrend = new Trend('latency');
var responseTrend = new Trend('responseTime');
// initial nebula connect pool
var pool = nebulaPool.init("192.168.8.152:9669", 400);
Expand All @@ -29,7 +29,7 @@ export default function (data) {
"IsSucceed": (r) => r.isSucceed() === true
});
// add trend
lantencyTrend.add(response.getLatency());
latencyTrend.add(response.getLatency());
responseTrend.add(response.getResponseTime());

};
Expand Down
23 changes: 22 additions & 1 deletion pkg/common/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type (
Path string
Delimiter string
WithHeader bool
divisor int
remainder int
DataCh chan<- Data
}

Expand All @@ -29,7 +31,6 @@ func NewCsvReader(path, delimiter string, withHeader bool, dataCh chan<- Data) *
WithHeader: withHeader,
DataCh: dataCh,
}

}

func NewCsvWriter(path, delimiter string, header []string, dataCh <-chan []string) *CSVWriter {
Expand All @@ -41,7 +42,20 @@ func NewCsvWriter(path, delimiter string, header []string, dataCh <-chan []strin
}
}

func (c *CSVReader) SetDivisor(divisor int) {
if divisor > 0 {
c.divisor = divisor
}
}

func (c *CSVReader) SetRemainder(remainder int) {
if remainder >= 0 {
c.remainder = remainder
}
}

func (c *CSVReader) ReadForever() error {
line := 0
file, err := os.Open(c.Path)
defer file.Close()
if err != nil {
Expand All @@ -67,12 +81,19 @@ func (c *CSVReader) ReadForever() error {
for {
row, err := reader.Read()
if err == io.EOF {
line = 0
file.Seek(offset, 0)
row, err = reader.Read()
}
if err != nil {
return
}
line++
if c.divisor > 0 && c.remainder >= 0 {
if line%c.divisor != c.remainder {
continue
}
}
c.DataCh <- row
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type (
Execute(stmt string) (IGraphResponse, error)
}

// IGraphResponse graph response, just support 3 functions to user.
// IGraphResponse graph response, just support some functions to user.
IGraphResponse interface {
IsSucceed() bool
GetLatency() int64
Expand All @@ -34,7 +34,7 @@ type (
IGraphClientPool interface {
IClientPool
GetSession(username, password string) (IGraphClient, error)
// Init initialize the poop with default channel buffersize
// Init initialize the poop with default channel bufferSize
Init(address string, concurrent int) (IGraphClientPool, error)
InitWithSize(address string, concurrent int, size int) (IGraphClientPool, error)
ConfigCSV(path, delimiter string, withHeader bool) error
Expand Down
62 changes: 31 additions & 31 deletions pkg/nebulagraph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
type (
// GraphPool nebula connection pool
GraphPool struct {
DataChs []chan common.Data
OutoptCh chan []string
DataCh chan common.Data
OutputCh chan []string
Version string
csvStrategy csvReaderStrategy
initialized bool
Expand All @@ -40,7 +40,7 @@ type (
password string
}

// Response a wrapper for nebula resultset
// Response a wrapper for nebula resultSet
Response struct {
*wrapper.ResultSet
ResponseTime int32
Expand All @@ -65,9 +65,9 @@ var _ common.IGraphClient = &GraphClient{}
var _ common.IGraphClientPool = &GraphPool{}

const (
// AllInOne all the vus use the same DataCh
// AllInOne read csv sequentially
AllInOne csvReaderStrategy = iota
// Separate each vu has a seprate DataCh
// Separate read csv concurrently
Separate
)

Expand Down Expand Up @@ -105,7 +105,7 @@ func NewNebulaGraph() *GraphPool {
}
}

// Init initializes nebula pool with address and concurrent, by default the buffersize is 20000
// Init initializes nebula pool with address and concurrent, by default the bufferSize is 20000
func (gp *GraphPool) Init(address string, concurrent int) (common.IGraphClientPool, error) {
return gp.InitWithSize(address, concurrent, 20000)
}
Expand All @@ -123,7 +123,9 @@ func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int)
if err != nil {
return nil, err
}
gp.DataCh = make(chan common.Data, chanSize)
gp.initialized = true

return gp, nil
}

Expand All @@ -142,7 +144,7 @@ func (gp *GraphPool) initAndVerifyPool(address string, concurrent int, chanSize
}
gp.clients = make([]nebula.GraphClient, 0)
gp.channelBufferSize = chanSize
gp.OutoptCh = make(chan []string, gp.channelBufferSize)
gp.OutputCh = make(chan []string, gp.channelBufferSize)
return nil
}

Expand All @@ -153,18 +155,31 @@ func (gp *GraphPool) ConfigCsvStrategy(strategy int) {

// ConfigCSV makes the read csv file configuration
func (gp *GraphPool) ConfigCSV(path, delimiter string, withHeader bool) error {
for _, dataCh := range gp.DataChs {
dataCh := gp.DataCh
if gp.csvStrategy == AllInOne {
reader := common.NewCsvReader(path, delimiter, withHeader, dataCh)
if err := reader.ReadForever(); err != nil {
return err
}
} else {
// read the csv concurrently
l := len(gp.clients)
for c := 0; c < l; c++ {
reader := common.NewCsvReader(path, delimiter, withHeader, dataCh)
reader.SetDivisor(l)
reader.SetRemainder(c)
if err := reader.ReadForever(); err != nil {
return err
}
}
}

return nil
}

// ConfigOutput makes the output file configuration, would write the execution outputs
func (gp *GraphPool) ConfigOutput(path string) error {
writer := common.NewCsvWriter(path, ",", outputHeader, gp.OutoptCh)
writer := common.NewCsvWriter(path, ",", outputHeader, gp.OutputCh)
if err := writer.WriteForever(); err != nil {
return err
}
Expand Down Expand Up @@ -208,8 +223,7 @@ func (gp *GraphPool) GetSession(username, password string) (common.IGraphClient,
}

gp.clients = append(gp.clients, client)
s := &GraphClient{Client: client, Pool: gp}
s.prepareCsvReader()
s := &GraphClient{Client: client, Pool: gp, DataCh: gp.DataCh}

return s, nil
}
Expand All @@ -225,23 +239,6 @@ func (gc *GraphClient) Close() error {
return gc.Client.Close()
}

func (gc *GraphClient) prepareCsvReader() error {
np := gc.Pool

if np.csvStrategy == AllInOne {
if len(np.DataChs) == 0 {
dataCh := make(chan common.Data, np.channelBufferSize)
np.DataChs = append(np.DataChs, dataCh)
}
gc.DataCh = np.DataChs[0]
} else {
dataCh := make(chan common.Data, np.channelBufferSize)
np.DataChs = append(np.DataChs, dataCh)
gc.DataCh = dataCh
}
return nil
}

// GetData get data from csv reader
func (gc *GraphClient) GetData() (common.Data, error) {
if gc.DataCh != nil && len(gc.DataCh) != 0 {
Expand Down Expand Up @@ -280,7 +277,7 @@ func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) {

responseTime := int32(time.Since(start) / 1000)
// output
if gc.Pool.OutoptCh != nil {
if gc.Pool.OutputCh != nil {
var fr []string
if rows != 0 {
for _, r := range rs.GetRows() {
Expand All @@ -301,7 +298,7 @@ func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) {
firstRecord: strings.Join(fr, "|"),
}
select {
case gc.Pool.OutoptCh <- formatOutput(o):
case gc.Pool.OutputCh <- formatOutput(o):
// abandon if the output chan is full.
default:
}
Expand Down Expand Up @@ -333,5 +330,8 @@ func (r *Response) GetLatency() int64 {

// GetRowSize GetRowSize
func (r *Response) GetRowSize() int32 {
return int32(r.ResultSet.GetRowSize())
if r.ResultSet != nil {
return int32(r.ResultSet.GetRowSize())
}
return 0
}

0 comments on commit e2abed0

Please sign in to comment.