-
Notifications
You must be signed in to change notification settings - Fork 3
/
ndgo.go
244 lines (207 loc) · 6.67 KB
/
ndgo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Package ndgo <read iNDiGO> provides dgo abstractions and helpers - github.com/ppp225/ndgo
package ndgo
import (
"context"
"time"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/ppp225/go-common"
)
// --------------------------------------- debug ---------------------------------------
const debug = false
// --------------------------------------- core ---------------------------------------
// Txn is a dgo.Txn wrapper with additional diagnostic data
// Helps with Queries, by providing abstractions for dgraph Query and Mutation
type Txn struct {
diag diag
ctx context.Context
txn *dgo.Txn
}
// NewTxn creates new Txn
func NewTxn(txn *dgo.Txn) *Txn {
return &Txn{
ctx: context.Background(),
txn: txn,
}
}
// NewTxnWithContext creates new Txn (with ctx)
func NewTxnWithContext(ctx context.Context, txn *dgo.Txn) *Txn {
return &Txn{
ctx: ctx,
txn: txn,
}
}
// Discard cleans up dgo.Txn resources. Always defer this on creation.
func (v *Txn) Discard() {
v.txn.Discard(v.ctx)
}
// Commit commits dgo.Txn
func (v *Txn) Commit() (err error) {
t := time.Now()
err = v.txn.Commit(v.ctx)
v.diag.addNW(t)
return
}
// Do executes a query followed by one or more mutations.
// Possible to run query without mutations, or vice versa
func (v *Txn) Do(req *api.Request) (resp *api.Response, err error) {
t := time.Now()
common.Log(debug, "Req: %s \n", req.String())
resp, err = v.txn.Do(v.ctx, req)
v.diag.addNW(t)
if err != nil {
return nil, err
}
v.diag.addDB(resp.Latency)
common.Log(debug, "Resp: %s\n---\n", resp.String())
return
}
// Mutate performs dgraph mutation
func (v *Txn) Mutate(mu *api.Mutation) (resp *api.Response, err error) {
t := time.Now()
common.Log(debug, "Mutate JSON: %s %s\n", string(mu.DeleteJson), string(mu.SetJson))
resp, err = v.txn.Mutate(v.ctx, mu)
v.diag.addNW(t)
if err != nil {
return nil, err
}
v.diag.addDB(resp.Latency)
common.Log(debug, "Mutate Resp: %s\n---\n", resp.String())
return
}
// Query performs dgraph query
func (v *Txn) Query(q string) (resp *api.Response, err error) {
t := time.Now()
common.Log(debug, "Query JSON: %s\n", q)
resp, err = v.txn.Query(v.ctx, q)
v.diag.addNW(t)
if err != nil {
return nil, err
}
v.diag.addDB(resp.Latency)
common.Log(debug, "Query Resp: %s\n---\n", resp.String())
return
}
// QueryWithVars performs dgraph query with vars
func (v *Txn) QueryWithVars(q string, vars map[string]string) (resp *api.Response, err error) {
t := time.Now()
common.Log(debug, "QueryWithVars JSON: %s %s\n", q, vars)
resp, err = v.txn.QueryWithVars(v.ctx, q, vars)
v.diag.addNW(t)
if err != nil {
return nil, err
}
v.diag.addDB(resp.Latency)
common.Log(debug, "QueryWithVars Resp: %s\n---\n", resp.String())
return
}
// --------------------------------------- diag ---------------------------------------
// diag contains diagnostic data for timing the transaction
// dbms - database total time - which sums all dgraph resp.Latency and
// nwms - newtwork total time - which is the total time until response
type diag struct {
dbms, nwms float64
}
func (v *diag) addDB(latency *api.Latency) {
v.dbms += v.getQueryLatency(latency)
}
func (v *diag) addNW(start time.Time) {
v.nwms += (float64)(time.Now().Sub(start).Nanoseconds()) / 1e6
}
func (v *diag) getQueryLatency(latency *api.Latency) float64 {
return (float64)((latency.EncodingNs+latency.ParsingNs+latency.ProcessingNs)/1e3) / 1e3
}
// GetDatabaseTime gets time txn spend in db
func (v *Txn) GetDatabaseTime() float64 {
return v.diag.dbms
}
// GetNetworkTime gets total time until response
func (v *Txn) GetNetworkTime() float64 {
return v.diag.nwms
}
// --------------------------------------- set ---------------------------------------
// Setb is equivalent to Mutate using SetJson
func (v *Txn) Setb(json []byte) (resp *api.Response, err error) {
return v.Mutate(&api.Mutation{
SetJson: json,
})
}
// Set is equivalent to Mutate using SetJson
func (v *Txn) Set(json string) (resp *api.Response, err error) {
return v.Setb([]byte(json))
}
// Seti is equivalent to Setb, but it marshalls structs into one slice of mutations
func (v *Txn) Seti(jsonMutations ...interface{}) (resp *api.Response, err error) {
return v.DoSeti("", jsonMutations...)
}
// --------------------------------------- delete ---------------------------------------
// Deleteb is equivalent to Mutate using DeleteJson
func (v *Txn) Deleteb(json []byte) (resp *api.Response, err error) {
return v.Mutate(&api.Mutation{
DeleteJson: json,
})
}
// Delete is equivalent to Mutate using DeleteJson
func (v *Txn) Delete(json string) (resp *api.Response, err error) {
return v.Deleteb([]byte(json))
}
// Deletei is equivalent to Deleteb, but it marshalls structs into one slice of mutations
func (v *Txn) Deletei(jsonMutations ...interface{}) (resp *api.Response, err error) {
return v.Deleteb(interfaces2Bytes(jsonMutations...))
}
// --------------------------------------- do set ---------------------------------------
// DoSetb is equivalent to Do using mutation with SetJson
func (v *Txn) DoSetb(query string, json []byte) (resp *api.Response, err error) {
mutations := []*api.Mutation{
{
SetJson: json,
},
}
return v.Do(&api.Request{
Query: query,
Mutations: mutations,
})
}
// DoSet is equivalent to Do using mutation with SetJson
func (v *Txn) DoSet(query string, json string) (resp *api.Response, err error) {
return v.DoSetb(query, []byte(json))
}
// DoSetbi is equivalent to DoSeti, but it uses single api.Mutation,
// as it marshalls structs into one slice of mutations
func (v *Txn) DoSetbi(query string, jsonMutations ...interface{}) (resp *api.Response, err error) {
return v.DoSetb(query, interfaces2Bytes(jsonMutations...))
}
// DoSeti is equivalent to Do, but it marshalls structs into mutations
func (v *Txn) DoSeti(query string, jsonMutations ...interface{}) (resp *api.Response, err error) {
return v.DoSetbi(query, jsonMutations...)
// TODO: uncomment when dgraph will support multiple mutations.
// mutations := []*api.Mutation{}
// for _, jm := range jsonMutations {
// jsonBytes, err := json.Marshal(jm)
// if err != nil {
// return nil, err
// }
// mu := &api.Mutation{
// SetJson: jsonBytes,
// }
// mutations = append(mutations, mu)
// }
// return v.Do(&api.Request{
// Query: query,
// Mutations: mutations,
// })
}
// DoSetnq is equivalent to Do using mutation with SetNquads
func (v *Txn) DoSetnq(query string, nquads string) (resp *api.Response, err error) {
mutations := []*api.Mutation{
{
SetNquads: []byte(nquads),
},
}
return v.Do(&api.Request{
Query: query,
Mutations: mutations,
})
}
// --------------------------------------- do delete ---------------------------------------
// TODO: