diff --git a/data/neo4j/main.go b/data/neo4j/main.go index f3d743b..5b9ed6c 100644 --- a/data/neo4j/main.go +++ b/data/neo4j/main.go @@ -17,188 +17,188 @@ // This program is used to read RDF files and load them into Neo4j. package main -import ( - "bufio" - "bytes" - "compress/gzip" - "flag" - "fmt" - "io" - "log" - "os" - "sync" - "sync/atomic" - "time" - - "github.com/dgraph-io/dgraph/query/graph" - "github.com/dgraph-io/dgraph/rdf" - "github.com/dgraph-io/dgraph/types" - "github.com/dgraph-io/dgraph/x" - bolt "github.com/johnnadratowski/golang-neo4j-bolt-driver" -) - -var ( - src = flag.String("r", "", "Gzipped RDF data file.") - concurrent = flag.Int("c", 20, "No of concurrent requests to perform.") - size = flag.Int("m", 200, "No of mutations to send per request.") -) - -func getReader(fname string) (*os.File, *bufio.Reader) { - f, err := os.Open(fname) - if err != nil { - log.Fatal("Unable to open file", err) - } - - r, err := gzip.NewReader(f) - if err != nil { - log.Fatal("Unable to open file", err) - } - - return f, bufio.NewReader(r) -} - -func getStrVal(rnq graph.NQuad) string { - switch rnq.ObjectType { - case 0: - return rnq.ObjectValue.GetStrVal() - case 5: - src := types.Val{types.DateID, rnq.ObjectValue.GetDateVal()} - dst, err := types.Convert(src, types.StringID) - if err != nil { - log.Fatal(err) - } - return dst.Value.(string) - case 6: - src := types.Val{types.DateTimeID, rnq.ObjectValue.GetDatetimeVal()} - dst, err := types.Convert(src, types.StringID) - if err != nil { - log.Fatal(err) - } - return dst.Value.(string) - } - log.Fatal("Types should be one of the above.") - return "" -} - -func makeRequests(d bolt.DriverPool, wg *sync.WaitGroup, rc chan request) { -TRY: - conn, err := d.OpenPool() - if err != nil { - time.Sleep(5 * time.Millisecond) - goto TRY - } - - for r := range rc { - pipeline, err := conn.PreparePipeline(r.req...) - if err != nil { - log.Fatal(err) - continue - } - _, err = pipeline.ExecPipeline(r.params...) - if err != nil { - log.Fatal(err) - } - - atomic.AddUint64(&s.rdfs, uint64(len(r.req))) - atomic.AddUint64(&s.mutations, 1) - - err = pipeline.Close() - if err != nil { - log.Fatal(err) - } - } - conn.Close() - wg.Done() -} - -type request struct { - req []string - params []map[string]interface{} -} - -type state struct { - rdfs uint64 - mutations uint64 - start time.Time -} - -var s state - -func printCounters(ticker *time.Ticker) { - for range ticker.C { - rdfs := atomic.LoadUint64(&s.rdfs) - mutations := atomic.LoadUint64(&s.mutations) - elapsed := time.Since(s.start) - rate := float64(rdfs) / elapsed.Seconds() - fmt.Printf("[Request: %6d] Total RDFs done: %8d RDFs per second: %7.0f\r", mutations, rdfs, rate) - } -} - -func main() { - flag.Parse() - driver, err := bolt.NewDriverPool("bolt://localhost:7687", *concurrent) - if err != nil { - log.Fatal(err) - } - - s.start = time.Now() - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - go printCounters(ticker) - var wg sync.WaitGroup - reqCh := make(chan request, 2*(*size)) - - for i := 0; i < *concurrent; i++ { - wg.Add(1) - go makeRequests(driver, &wg, reqCh) - } - - fmt.Printf("\nProcessing %s\n", *src) - f, bufReader := getReader(*src) - - var strBuf bytes.Buffer - var r request - count := 0 - for { - err = x.ReadLine(bufReader, &strBuf) - if err != nil { - break - } - rnq, err := rdf.Parse(strBuf.String()) - pred := rnq.Predicate - x.Checkf(err, "Unable to parse line: [%v]", strBuf.String()) - if len(rnq.ObjectId) > 0 { - r.req = append(r.req, fmt.Sprintf("MERGE (n { xid: {xid1} }) MERGE (n2 { xid: {xid2} }) MERGE (n) - [r:`%s`] - (n2)", pred)) - r.params = append(r.params, map[string]interface{}{"xid1": rnq.Subject, "xid2": rnq.ObjectId}) - } else { - // Merge will check if a node with this xid as property exists, else create it. In either case it will - // add another property with the predicate. - r.req = append(r.req, fmt.Sprintf("MERGE (n { xid: {xid} }) ON CREATE SET n.`%s` = {val} ON MATCH SET n.`%s` = {val}", pred, pred)) - r.params = append(r.params, map[string]interface{}{"xid": rnq.Subject, "val": getStrVal(rnq)}) - } - count++ - if int(count)%(*size) == 0 { - rc := request{ - req: make([]string, len(r.req)), - params: make([]map[string]interface{}, len(r.params)), - } - copy(rc.params, r.params) - copy(rc.req, r.req) - reqCh <- rc - r.req = r.req[:0] - r.params = r.params[:0] - } - - } - if err != nil && err != io.EOF { - err := x.Errorf("Error while reading file: %v", err) - log.Fatalf("%+v", err) - } - - if len(r.req) > 0 { - reqCh <- r - } - x.Check(f.Close()) - close(reqCh) - wg.Wait() -} +// import ( +// "bufio" +// "bytes" +// "compress/gzip" +// "flag" +// "fmt" +// "io" +// "log" +// "os" +// "sync" +// "sync/atomic" +// "time" + +// "github.com/dgraph-io/dgraph/query/graph" +// "github.com/dgraph-io/dgraph/rdf" +// "github.com/dgraph-io/dgraph/types" +// "github.com/dgraph-io/dgraph/x" +// bolt "github.com/johnnadratowski/golang-neo4j-bolt-driver" +// ) + +// var ( +// src = flag.String("r", "", "Gzipped RDF data file.") +// concurrent = flag.Int("c", 20, "No of concurrent requests to perform.") +// size = flag.Int("m", 200, "No of mutations to send per request.") +// ) + +// func getReader(fname string) (*os.File, *bufio.Reader) { +// f, err := os.Open(fname) +// if err != nil { +// log.Fatal("Unable to open file", err) +// } + +// r, err := gzip.NewReader(f) +// if err != nil { +// log.Fatal("Unable to open file", err) +// } + +// return f, bufio.NewReader(r) +// } + +// func getStrVal(rnq graph.NQuad) string { +// switch rnq.ObjectType { +// case 0: +// return rnq.ObjectValue.GetStrVal() +// case 5: +// src := types.Val{types.DateID, rnq.ObjectValue.GetDateVal()} +// dst, err := types.Convert(src, types.StringID) +// if err != nil { +// log.Fatal(err) +// } +// return dst.Value.(string) +// case 6: +// src := types.Val{types.DateTimeID, rnq.ObjectValue.GetDatetimeVal()} +// dst, err := types.Convert(src, types.StringID) +// if err != nil { +// log.Fatal(err) +// } +// return dst.Value.(string) +// } +// log.Fatal("Types should be one of the above.") +// return "" +// } + +// func makeRequests(d bolt.DriverPool, wg *sync.WaitGroup, rc chan request) { +// TRY: +// conn, err := d.OpenPool() +// if err != nil { +// time.Sleep(5 * time.Millisecond) +// goto TRY +// } + +// for r := range rc { +// pipeline, err := conn.PreparePipeline(r.req...) +// if err != nil { +// log.Fatal(err) +// continue +// } +// _, err = pipeline.ExecPipeline(r.params...) +// if err != nil { +// log.Fatal(err) +// } + +// atomic.AddUint64(&s.rdfs, uint64(len(r.req))) +// atomic.AddUint64(&s.mutations, 1) + +// err = pipeline.Close() +// if err != nil { +// log.Fatal(err) +// } +// } +// conn.Close() +// wg.Done() +// } + +// type request struct { +// req []string +// params []map[string]interface{} +// } + +// type state struct { +// rdfs uint64 +// mutations uint64 +// start time.Time +// } + +// var s state + +// func printCounters(ticker *time.Ticker) { +// for range ticker.C { +// rdfs := atomic.LoadUint64(&s.rdfs) +// mutations := atomic.LoadUint64(&s.mutations) +// elapsed := time.Since(s.start) +// rate := float64(rdfs) / elapsed.Seconds() +// fmt.Printf("[Request: %6d] Total RDFs done: %8d RDFs per second: %7.0f\r", mutations, rdfs, rate) +// } +// } + +// func main() { +// flag.Parse() +// driver, err := bolt.NewDriverPool("bolt://localhost:7687", *concurrent) +// if err != nil { +// log.Fatal(err) +// } + +// s.start = time.Now() +// ticker := time.NewTicker(2 * time.Second) +// defer ticker.Stop() +// go printCounters(ticker) +// var wg sync.WaitGroup +// reqCh := make(chan request, 2*(*size)) + +// for i := 0; i < *concurrent; i++ { +// wg.Add(1) +// go makeRequests(driver, &wg, reqCh) +// } + +// fmt.Printf("\nProcessing %s\n", *src) +// f, bufReader := getReader(*src) + +// var strBuf bytes.Buffer +// var r request +// count := 0 +// for { +// err = x.ReadLine(bufReader, &strBuf) +// if err != nil { +// break +// } +// rnq, err := rdf.Parse(strBuf.String()) +// pred := rnq.Predicate +// x.Checkf(err, "Unable to parse line: [%v]", strBuf.String()) +// if len(rnq.ObjectId) > 0 { +// r.req = append(r.req, fmt.Sprintf("MERGE (n { xid: {xid1} }) MERGE (n2 { xid: {xid2} }) MERGE (n) - [r:`%s`] - (n2)", pred)) +// r.params = append(r.params, map[string]interface{}{"xid1": rnq.Subject, "xid2": rnq.ObjectId}) +// } else { +// // Merge will check if a node with this xid as property exists, else create it. In either case it will +// // add another property with the predicate. +// r.req = append(r.req, fmt.Sprintf("MERGE (n { xid: {xid} }) ON CREATE SET n.`%s` = {val} ON MATCH SET n.`%s` = {val}", pred, pred)) +// r.params = append(r.params, map[string]interface{}{"xid": rnq.Subject, "val": getStrVal(rnq)}) +// } +// count++ +// if int(count)%(*size) == 0 { +// rc := request{ +// req: make([]string, len(r.req)), +// params: make([]map[string]interface{}, len(r.params)), +// } +// copy(rc.params, r.params) +// copy(rc.req, r.req) +// reqCh <- rc +// r.req = r.req[:0] +// r.params = r.params[:0] +// } + +// } +// if err != nil && err != io.EOF { +// err := x.Errorf("Error while reading file: %v", err) +// log.Fatalf("%+v", err) +// } + +// if len(r.req) > 0 { +// reqCh <- r +// } +// x.Check(f.Close()) +// close(reqCh) +// wg.Wait() +// } diff --git a/data/neo4j/query_test.go b/data/neo4j/query_test.go index 32fff95..8af02c8 100644 --- a/data/neo4j/query_test.go +++ b/data/neo4j/query_test.go @@ -3,7 +3,7 @@ package main import ( "context" "fmt" - "log" + // "log" "math/rand" "os" "strconv" @@ -12,31 +12,21 @@ import ( "google.golang.org/grpc" - "github.com/dgraph-io/dgraph/client" - "github.com/dgraph-io/dgraph/query/graph" - bolt "github.com/johnnadratowski/golang-neo4j-bolt-driver" + "github.com/dgraph-io/dgo" + "github.com/dgraph-io/dgo/protos/api" ) -func getDgraphConn(ch chan *grpc.ClientConn) *grpc.ClientConn { - select { - case c := <-ch: - return c - default: - log.Fatal("Ran out of connections in the channel") +func DgraphClient(serviceAddr string) (*dgo.Dgraph, error) { + conn, err := grpc.Dial(serviceAddr, grpc.WithInsecure()) + if err != nil { + return nil, err } - return nil -} -func putDgraphConn(ch chan *grpc.ClientConn, c *grpc.ClientConn) { - select { - case ch <- c: - default: - log.Fatal("Not enough capacity, can't put it in") - } + return dgo.NewDgraphClient(api.NewDgraphClient(conn)), nil } -func getVal() *graph.Value { - return &graph.Value{&graph.Value_StrVal{strconv.Itoa(rand.Int())}} +func getVal() *api.Value { + return &api.Value{Val: &api.Value_StrVal{strconv.Itoa(rand.Int())}} } func BenchmarkDgraph(b *testing.B) { @@ -46,79 +36,71 @@ func BenchmarkDgraph(b *testing.B) { }{ { "Simple", `{ - me(id: m.06pj8) { - type.object.name.en - film.director.film { - film.film.genre { - type.object.name.en + me(func: uid(0xff)) { + name@en + director.film { + genre { + name@en } - type.object.name.en - film.film.initial_release_date + name@en + initial_release_date } } }`, }, { "GetStarted1", `{ - director(allof("type.object.name.en", "steven spielberg")) { - type.object.name.en - film.director.film (order: film.film.initial_release_date) { - type.object.name.en - film.film.initial_release_date + director(func: allofterms(name@en, "steven spielberg")) { + name@en + director.film (orderasc: initial_release_date) { + name@en + initial_release_date } } }`, }, { "GetStarted2", `{ - director(allof("type.object.name.en", "steven spielberg")) { - type.object.name.en - film.director.film (order: film.film.initial_release_date) @filter(geq("film.film.initial_release_date", "1984-08")) { - type.object.name.en - film.film.initial_release_date + director(func: allofterms(name@en, "steven spielberg")) { + name@en + director.film (orderasc: initial_release_date) @filter(ge(initial_release_date, "1984-08")) { + name@en + initial_release_date } } }`, }, { "GetStarted3", `{ - director(allof("type.object.name.en", "steven spielberg")) { - type.object.name.en - film.director.film (order: film.film.initial_release_date) @filter(geq("film.film.initial_release_date", "1990") && leq("film.film.initial_release_date", "2000")) { - type.object.name.en - film.film.initial_release_date + director(func: allofterms(name@en, "steven spielberg")) { + name@en + director.film (orderasc: initial_release_date) @filter(ge(initial_release_date, "1990") AND le(initial_release_date, "2000")) { + name@en + initial_release_date } } }`, }, } - poolSize := 8 - connCh := make(chan *grpc.ClientConn, poolSize) - for i := 0; i < poolSize; i++ { - conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure()) - if err != nil { - b.Fatal(err) - } - putDgraphConn(connCh, conn) + client, err := DgraphClient("localhost:9180") + if err != nil { + b.Fatalf("Error while getting client: %v", err) } - var err error for _, q := range queries { b.Run(fmt.Sprintf("%v-Query", q.name), func(b *testing.B) { - conn := getDgraphConn(connCh) - c := graph.NewDgraphClient(conn) - req := client.Req{} - req.SetQuery(q.query) + txn := client.NewTxn() + defer txn.Discard(context.Background()) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = c.Run(context.Background(), req.Request()) + _, err := txn.Query(context.Background(), q.query) if err != nil { b.Fatalf("Error in getting response from server, %s", err) } } + b.StopTimer() - putDgraphConn(connCh, conn) time.Sleep(time.Second) }) } @@ -126,47 +108,49 @@ func BenchmarkDgraph(b *testing.B) { for _, q := range queries { b.Run(fmt.Sprintf("%v-Query-parallel", q.name), func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { - conn := getDgraphConn(connCh) - c := graph.NewDgraphClient(conn) - req := client.Req{} - req.SetQuery(q.query) + txn := client.NewTxn() + defer txn.Discard(context.Background()) + for pb.Next() { - _, err = c.Run(context.Background(), req.Request()) + _, err := txn.Query(context.Background(), q.query) if err != nil { - b.Fatal("Error in query", err) + b.Fatalf("Error in getting response from server, %s", err) } } - putDgraphConn(connCh, conn) }) time.Sleep(time.Second) }) } - // NQuad mutation which is sent as part of the request. - nq := graph.NQuad{ - Subject: "m.0h_b6x1", - Predicate: "type.object.name.en", - } - for _, q := range queries { b.Run(fmt.Sprintf("%v-QueryAndMutation", q.name), func(b *testing.B) { + // NQuad mutation which is sent as part of the request. + nq := api.NQuad{ + Subject: "_:node", + Predicate: "name", + Lang: "en", + } - conn := getDgraphConn(connCh) - c := graph.NewDgraphClient(conn) - req := client.Req{} - req.SetQuery(q.query) + txn := client.NewTxn() + defer txn.Commit(context.Background()) b.ResetTimer() for i := 0; i < b.N; i++ { + _, err := txn.Query(context.Background(), q.query) + if err != nil { + b.Fatalf("Error in getting response from server, %s", err) + } + nq.ObjectValue = getVal() - req.AddMutation(nq, client.SET) - _, err := c.Run(context.Background(), req.Request()) + mu := api.Mutation{ + Set: []*api.NQuad{&nq}, + } + _, err = txn.Mutate(context.Background(), &mu) if err != nil { b.Fatalf("Error in getting response from server, %s", err) } } b.StopTimer() - putDgraphConn(connCh, conn) time.Sleep(time.Second) }) } @@ -175,145 +159,157 @@ func BenchmarkDgraph(b *testing.B) { b.Run(fmt.Sprintf("%v-QueryAndMutation-parallel", q.name), func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { - conn := getDgraphConn(connCh) - c := graph.NewDgraphClient(conn) - req := client.Req{} - req.SetQuery(q.query) + // NQuad mutation which is sent as part of the request. + nq := api.NQuad{ + Subject: "_:node", + Predicate: "name", + Lang: "en", + } + + txn := client.NewTxn() + defer txn.Commit(context.Background()) + for pb.Next() { + _, err := txn.Query(context.Background(), q.query) + if err != nil { + b.Fatalf("Error in getting response from server, %s", err) + } + nq.ObjectValue = getVal() - req.AddMutation(nq, client.SET) - _, err = c.Run(context.Background(), req.Request()) + mu := api.Mutation{ + Set: []*api.NQuad{&nq}, + } + _, err = txn.Mutate(context.Background(), &mu) if err != nil { - b.Fatal("Error in query", err) + b.Fatalf("Error in getting response from server, %s", err) } } - putDgraphConn(connCh, conn) }) time.Sleep(time.Second) }) } } -func getNeoConn(ch chan bolt.Conn) bolt.Conn { - select { - case c := <-ch: - return c - default: - log.Fatal("Ran out of connections in the channel") - } - return nil -} +// func getNeoConn(ch chan bolt.Conn) bolt.Conn { +// select { +// case c := <-ch: +// return c +// default: +// log.Fatal("Ran out of connections in the channel") +// } +// return nil +// } -func putNeoConn(ch chan bolt.Conn, c bolt.Conn) { - select { - case ch <- c: - default: - log.Fatal("Not enough capacity, can't put it in") - } -} +// func putNeoConn(ch chan bolt.Conn, c bolt.Conn) { +// select { +// case ch <- c: +// default: +// log.Fatal("Not enough capacity, can't put it in") +// } +// } -func BenchmarkNeo(b *testing.B) { - queries := []struct { - name string - query string - }{ - {"Simple", `MATCH (d: Director) - [r:FILMS] -> (f:Film) - [r2:GENRE] -> (g:Genre) WHERE d.directorId="m.06pj8" RETURN d, f, g`}, - {"GetStarted1", `MATCH (d: Director) - [r:FILMS] -> (f:Film) WHERE d.name CONTAINS "Steven Spielberg" WITH d,f ORDER BY f.release_date ASC RETURN d, f`}, - {"GetStarted2", `MATCH (d: Director) - [r:FILMS] -> (f:Film) WHERE d.name CONTAINS "Steven Spielberg" AND f.release_date >= "1984-08" WITH d,f ORDER BY f.release_date ASC RETURN d, f`}, - {"GetStarted3", `MATCH (d: Director) - [r:FILMS] -> (f:Film) WHERE d.name CONTAINS "Steven Spielberg" AND f.release_date >= "1984" AND f.release_date <= "2000" WITH d,f ORDER BY f.release_date ASC RETURN d, f`}, - } +// func BenchmarkNeo(b *testing.B) { +// queries := []struct { +// name string +// query string +// }{ +// {"Simple", `MATCH (d: Director) - [r:FILMS] -> (f:Film) - [r2:GENRE] -> (g:Genre) WHERE d.directorId="m.06pj8" RETURN d, f, g`}, +// {"GetStarted1", `MATCH (d: Director) - [r:FILMS] -> (f:Film) WHERE d.name CONTAINS "Steven Spielberg" WITH d,f ORDER BY f.release_date ASC RETURN d, f`}, +// {"GetStarted2", `MATCH (d: Director) - [r:FILMS] -> (f:Film) WHERE d.name CONTAINS "Steven Spielberg" AND f.release_date >= "1984-08" WITH d,f ORDER BY f.release_date ASC RETURN d, f`}, +// {"GetStarted3", `MATCH (d: Director) - [r:FILMS] -> (f:Film) WHERE d.name CONTAINS "Steven Spielberg" AND f.release_date >= "1984" AND f.release_date <= "2000" WITH d,f ORDER BY f.release_date ASC RETURN d, f`}, +// } - driver := bolt.NewDriver() - poolSize := 8 - connCh := make(chan bolt.Conn, poolSize) - for i := 0; i < poolSize; i++ { - conn, err := driver.OpenNeo("bolt://localhost:7687") - if err != nil { - b.Fatal(err) - } - putNeoConn(connCh, conn) - } - mutation := `MATCH (n:Film { filmId: {id} }) SET n.name = {name}` - var err error +// driver := bolt.NewDriver() +// poolSize := 8 +// connCh := make(chan bolt.Conn, poolSize) +// for i := 0; i < poolSize; i++ { +// conn, err := driver.OpenNeo("bolt://localhost:7687") +// if err != nil { +// b.Fatal(err) +// } +// putNeoConn(connCh, conn) +// } +// mutation := `MATCH (n:Film { filmId: {id} }) SET n.name = {name}` +// var err error - for _, q := range queries { - b.Run(fmt.Sprintf("%v-Query", q.name), func(b *testing.B) { - conn := getNeoConn(connCh) - b.ResetTimer() +// for _, q := range queries { +// b.Run(fmt.Sprintf("%v-Query", q.name), func(b *testing.B) { +// conn := getNeoConn(connCh) +// b.ResetTimer() - for i := 0; i < b.N; i++ { - _, _, _, err = conn.QueryNeoAll(q.query, nil) - if err != nil { - b.Fatal(err) - } - } - b.StopTimer() - putNeoConn(connCh, conn) - time.Sleep(time.Second) - }) - } +// for i := 0; i < b.N; i++ { +// _, _, _, err = conn.QueryNeoAll(q.query, nil) +// if err != nil { +// b.Fatal(err) +// } +// } +// b.StopTimer() +// putNeoConn(connCh, conn) +// time.Sleep(time.Second) +// }) +// } - for _, q := range queries { - b.Run(fmt.Sprintf("%v-Query-parallel", q.name), func(b *testing.B) { - b.RunParallel(func(pb *testing.PB) { - conn := getNeoConn(connCh) - for pb.Next() { - _, _, _, err = conn.QueryNeoAll(q.query, nil) - if err != nil { - b.Fatal(err) - } - } - putNeoConn(connCh, conn) - }) - time.Sleep(time.Second) - }) - } +// for _, q := range queries { +// b.Run(fmt.Sprintf("%v-Query-parallel", q.name), func(b *testing.B) { +// b.RunParallel(func(pb *testing.PB) { +// conn := getNeoConn(connCh) +// for pb.Next() { +// _, _, _, err = conn.QueryNeoAll(q.query, nil) +// if err != nil { +// b.Fatal(err) +// } +// } +// putNeoConn(connCh, conn) +// }) +// time.Sleep(time.Second) +// }) +// } - for _, q := range queries { - b.Run(fmt.Sprintf("%v-QueryAndMutation", q.name), func(b *testing.B) { - conn := getNeoConn(connCh) - params := map[string]interface{}{"id": "m.0h_b6x1", "name": "Terminal"} - b.ResetTimer() +// for _, q := range queries { +// b.Run(fmt.Sprintf("%v-QueryAndMutation", q.name), func(b *testing.B) { +// conn := getNeoConn(connCh) +// params := map[string]interface{}{"id": "m.0h_b6x1", "name": "Terminal"} +// b.ResetTimer() - for i := 0; i < b.N; i++ { - _, _, _, err = conn.QueryNeoAll(q.query, nil) - if err != nil { - b.Fatal(err) - } - params["name"] = strconv.Itoa(rand.Int()) - _, err = conn.ExecNeo(mutation, params) - if err != nil { - b.Fatal(err) - } - } - b.StopTimer() - putNeoConn(connCh, conn) - time.Sleep(time.Second) - }) - } +// for i := 0; i < b.N; i++ { +// _, _, _, err = conn.QueryNeoAll(q.query, nil) +// if err != nil { +// b.Fatal(err) +// } +// params["name"] = strconv.Itoa(rand.Int()) +// _, err = conn.ExecNeo(mutation, params) +// if err != nil { +// b.Fatal(err) +// } +// } +// b.StopTimer() +// putNeoConn(connCh, conn) +// time.Sleep(time.Second) +// }) +// } - for _, q := range queries { - b.Run(fmt.Sprintf("%v-QueryAndMutation-parallel", q.name), func(b *testing.B) { - b.RunParallel(func(pb *testing.PB) { - conn := getNeoConn(connCh) - params := map[string]interface{}{"id": "m.0h_b6x1", "name": "Terminal"} - for pb.Next() { - _, _, _, err = conn.QueryNeoAll(q.query, nil) - if err != nil { - b.Fatal(err) - } - params["name"] = strconv.Itoa(rand.Int()) - _, err = conn.ExecNeo(mutation, params) - if err != nil { - b.Fatal(err) - } - } - putNeoConn(connCh, conn) - }) - time.Sleep(time.Second) - }) - } -} +// for _, q := range queries { +// b.Run(fmt.Sprintf("%v-QueryAndMutation-parallel", q.name), func(b *testing.B) { +// b.RunParallel(func(pb *testing.PB) { +// conn := getNeoConn(connCh) +// params := map[string]interface{}{"id": "m.0h_b6x1", "name": "Terminal"} +// for pb.Next() { +// _, _, _, err = conn.QueryNeoAll(q.query, nil) +// if err != nil { +// b.Fatal(err) +// } +// params["name"] = strconv.Itoa(rand.Int()) +// _, err = conn.ExecNeo(mutation, params) +// if err != nil { +// b.Fatal(err) +// } +// } +// putNeoConn(connCh, conn) +// }) +// time.Sleep(time.Second) +// }) +// } +// } func TestMain(m *testing.M) { rand.Seed(time.Now().UnixNano())