forked from dgraph-io/dgraph
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
128 lines (114 loc) · 3.31 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// This script is used to load data into Dgraph from an RDF file by performing
// mutations using the HTTP interface.
//
// You can run the script like
// go build . && ./dgraphloader -r path-to-gzipped-rdf.gz
package main
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
"time"
"google.golang.org/grpc"
"github.com/dgraph-io/dgraph/goclient/client"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/x"
)
var (
files = flag.String("r", "", "Location of rdf files to load")
dgraph = flag.String("d", "127.0.0.1:8080", "Dgraph server address")
concurrent = flag.Int("c", 100, "Number of concurrent requests to make to Dgraph")
numRdf = flag.Int("m", 1000, "Number of RDF N-Quads to send as part of a mutation.")
)
// Reads a single line from a buffered reader. The line is read into the
// passed in buffer to minimize allocations. This is the preferred
// method for loading long lines which could be longer than the buffer
// size of bufio.Scanner.
func readLine(r *bufio.Reader, buf *bytes.Buffer) error {
isPrefix := true
var err error
for isPrefix && err == nil {
var line []byte
// The returned line is an internal buffer in bufio and is only
// valid until the next call to ReadLine. It needs to be copied
// over to our own buffer.
line, isPrefix, err = r.ReadLine()
if err == nil {
buf.Write(line)
}
}
return err
}
// processFile sends mutations for a given gz file.
func processFile(file string, batch *client.BatchMutation) {
fmt.Printf("\nProcessing %s\n", file)
f, err := os.Open(file)
x.Check(err)
defer f.Close()
gr, err := gzip.NewReader(f)
x.Check(err)
var buf bytes.Buffer
bufReader := bufio.NewReader(gr)
for {
err = readLine(bufReader, &buf)
if err != nil {
break
}
nq, err := rdf.Parse(buf.String())
if err != nil {
log.Fatal("While parsing RDF: ", err)
}
buf.Reset()
if err = batch.AddMutation(nq, client.SET); err != nil {
log.Fatal("While adding mutation to batch: ", err)
}
}
if err != io.EOF {
x.Checkf(err, "Error while reading file")
}
}
func printCounters(batch *client.BatchMutation, ticker *time.Ticker) {
for range ticker.C {
c := batch.Counter()
rate := float64(c.Rdfs) / c.Elapsed.Seconds()
fmt.Printf("[Request: %6d] Total RDFs done: %8d RDFs per second: %7.0f\r", c.Mutations, c.Rdfs, rate)
}
}
func main() {
x.Init()
var err error
conn, err := grpc.Dial(*dgraph, grpc.WithInsecure())
x.Checkf(err, "While trying to dial gRPC")
defer conn.Close()
batch := client.NewBatchMutation(context.Background(), conn, *numRdf, *concurrent)
ticker := time.NewTicker(2 * time.Second)
go printCounters(batch, ticker)
filesList := strings.Split(*files, ",")
x.AssertTrue(len(filesList) > 0)
for _, file := range filesList {
processFile(file, batch)
}
batch.Flush()
ticker.Stop()
c := batch.Counter()
var rate uint64
if c.Elapsed.Seconds() < 1 {
rate = c.Rdfs
} else {
rate = c.Rdfs / uint64(c.Elapsed.Seconds())
}
// Lets print an empty line, otherwise Number of Mutations overwrites the previous
// printed line.
fmt.Printf("%100s\r", "")
fmt.Printf("Number of mutations run : %d\n", c.Mutations)
fmt.Printf("Number of RDFs processed : %d\n", c.Rdfs)
fmt.Printf("Time spent : %v\n", c.Elapsed)
fmt.Printf("RDFs processed per second : %d\n", rate)
}