-
Notifications
You must be signed in to change notification settings - Fork 1
/
wrapper.go
118 lines (107 loc) · 3.42 KB
/
wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package nrdgraph
import (
"context"
"github.com/dgraph-io/dgo/protos/api"
newrelic "github.com/newrelic/go-agent"
"google.golang.org/grpc"
)
var (
datastoreDgraph = newrelic.DatastoreProduct("Dgraph")
)
// Wrap creates wraps the DgraphClient to spport to creating newrelic segments on each requests.
func Wrap(dc api.DgraphClient, opts ...Option) api.DgraphClient {
o := &options{}
for _, f := range opts {
f(o)
}
return &wrappedClient{
dc: dc,
options: o,
}
}
type wrappedClient struct {
dc api.DgraphClient
options *options
}
func (c *wrappedClient) Query(ctx context.Context, in *api.Request, opts ...grpc.CallOption) (*api.Response, error) {
if txn := newrelic.FromContext(ctx); txn != nil {
params := make(map[string]interface{}, len(in.GetVars())+2)
for k, v := range in.GetVars() {
params[k] = v
}
params["start_ts"] = in.GetStartTs()
params["lin_read"] = in.GetLinRead()
seg := c.createSegment(txn)
seg.Operation = "Query"
seg.ParameterizedQuery = in.GetQuery()
seg.QueryParameters = params
defer seg.End()
}
return c.dc.Query(ctx, in, opts...)
}
func (c *wrappedClient) Mutate(ctx context.Context, in *api.Mutation, opts ...grpc.CallOption) (*api.Assigned, error) {
if txn := newrelic.FromContext(ctx); txn != nil {
seg := c.createSegment(txn)
seg.Operation = "Mutate"
seg.QueryParameters = map[string]interface{}{
"set_json": string(in.GetSetJson()),
"delete_json": string(in.GetDeleteJson()),
"set_nquads": string(in.GetSetNquads()),
"del_nquads": string(in.GetDelNquads()),
"set": in.GetSet(),
"del": in.GetDel(),
"start_ts": in.GetStartTs(),
"commit_now": in.GetCommitNow(),
"ignore_index_conflict": in.GetIgnoreIndexConflict(),
}
defer seg.End()
}
return c.dc.Mutate(ctx, in, opts...)
}
func (c *wrappedClient) Alter(ctx context.Context, in *api.Operation, opts ...grpc.CallOption) (*api.Payload, error) {
if txn := newrelic.FromContext(ctx); txn != nil {
seg := c.createSegment(txn)
seg.Operation = "Alter"
seg.QueryParameters = map[string]interface{}{
"drop_all": in.GetDropAll(),
"drop_attr": in.GetDropAttr(),
"schema": in.GetSchema(),
}
defer seg.End()
}
return c.dc.Alter(ctx, in, opts...)
}
func (c *wrappedClient) CommitOrAbort(ctx context.Context, in *api.TxnContext, opts ...grpc.CallOption) (*api.TxnContext, error) {
if txn := newrelic.FromContext(ctx); txn != nil {
seg := c.createSegment(txn)
seg.Operation = "Commit"
if in.Aborted {
seg.Operation = "Abort"
}
seg.QueryParameters = map[string]interface{}{
"start_ts": in.GetStartTs(),
"commit_ts": in.GetCommitTs(),
"keys": in.GetKeys(),
"lin_read": in.GetLinRead(),
}
defer seg.End()
}
return c.dc.CommitOrAbort(ctx, in, opts...)
}
func (c *wrappedClient) CheckVersion(ctx context.Context, in *api.Check, opts ...grpc.CallOption) (*api.Version, error) {
if txn := newrelic.FromContext(ctx); txn != nil {
seg := c.createSegment(txn)
seg.Operation = "CheckVersion"
defer seg.End()
}
return c.dc.CheckVersion(ctx, in, opts...)
}
func (c *wrappedClient) createSegment(txn newrelic.Transaction) newrelic.DatastoreSegment {
return newrelic.DatastoreSegment{
StartTime: newrelic.StartSegmentNow(txn),
Product: datastoreDgraph,
Host: c.options.host,
PortPathOrID: c.options.id,
DatabaseName: c.options.databaseName,
}
}