forked from couchbase/gocbcore
/
agentops_internal.go
225 lines (197 loc) · 5.9 KB
/
agentops_internal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package gocbcore
import (
"encoding/binary"
"github.com/opentracing/opentracing-go"
)
// GetMetaOptions encapsulates the parameters for a GetMetaEx operation.
type GetMetaOptions struct {
Key []byte
TraceContext opentracing.SpanContext
}
// GetMetaResult encapsulates the result of a GetMetaEx operation.
type GetMetaResult struct {
Value []byte
Flags uint32
Cas Cas
Expiry uint32
SeqNo SeqNo
Datatype uint8
Deleted uint32
}
// GetMetaExCallback is invoked upon completion of a GetMetaEx operation.
type GetMetaExCallback func(*GetMetaResult, error)
// GetMetaEx retrieves a document along with some internal Couchbase meta-data.
func (agent *Agent) GetMetaEx(opts GetMetaOptions, cb GetMetaExCallback) (PendingOp, error) {
tracer := agent.createOpTrace("GetMetaEx", nil)
handler := func(resp *memdQResponse, req *memdQRequest, err error) {
if err != nil {
tracer.Finish()
cb(nil, err)
return
}
if len(resp.Extras) != 21 {
tracer.Finish()
cb(nil, ErrProtocol)
return
}
deleted := binary.BigEndian.Uint32(resp.Extras[0:])
flags := binary.BigEndian.Uint32(resp.Extras[4:])
expTime := binary.BigEndian.Uint32(resp.Extras[8:])
seqNo := SeqNo(binary.BigEndian.Uint64(resp.Extras[12:]))
dataType := resp.Extras[20]
tracer.Finish()
cb(&GetMetaResult{
Value: resp.Value,
Flags: flags,
Cas: Cas(resp.Cas),
Expiry: expTime,
SeqNo: seqNo,
Datatype: dataType,
Deleted: deleted,
}, nil)
}
extraBuf := make([]byte, 1)
extraBuf[0] = 2
req := &memdQRequest{
memdPacket: memdPacket{
Magic: reqMagic,
Opcode: cmdGetMeta,
Datatype: 0,
Cas: 0,
Extras: extraBuf,
Key: opts.Key,
Value: nil,
},
Callback: handler,
RootTraceContext: tracer.RootContext(),
}
return agent.dispatchOp(req)
}
// SetMetaOptions encapsulates the parameters for a SetMetaEx operation.
type SetMetaOptions struct {
Key []byte
Value []byte
Extra []byte
Datatype uint8
Options uint32
Flags uint32
Expiry uint32
Cas Cas
RevNo uint64
TraceContext opentracing.SpanContext
}
// SetMetaResult encapsulates the result of a SetMetaEx operation.
type SetMetaResult struct {
Cas Cas
MutationToken MutationToken
}
// SetMetaExCallback is invoked upon completion of a SetMetaEx operation.
type SetMetaExCallback func(*SetMetaResult, error)
// SetMetaEx stores a document along with setting some internal Couchbase meta-data.
func (agent *Agent) SetMetaEx(opts SetMetaOptions, cb SetMetaExCallback) (PendingOp, error) {
tracer := agent.createOpTrace("GetMetaEx", nil)
handler := func(resp *memdQResponse, req *memdQRequest, err error) {
if err != nil {
tracer.Finish()
cb(nil, err)
return
}
mutToken := MutationToken{}
if len(resp.Extras) >= 16 {
mutToken.VbId = req.Vbucket
mutToken.VbUuid = VbUuid(binary.BigEndian.Uint64(resp.Extras[0:]))
mutToken.SeqNo = SeqNo(binary.BigEndian.Uint64(resp.Extras[8:]))
}
tracer.Finish()
cb(&SetMetaResult{
Cas: Cas(resp.Cas),
MutationToken: mutToken,
}, nil)
}
extraBuf := make([]byte, 30+len(opts.Extra))
binary.BigEndian.PutUint32(extraBuf[0:], opts.Flags)
binary.BigEndian.PutUint32(extraBuf[4:], opts.Expiry)
binary.BigEndian.PutUint64(extraBuf[8:], uint64(opts.RevNo))
binary.BigEndian.PutUint64(extraBuf[16:], uint64(opts.Cas))
binary.BigEndian.PutUint32(extraBuf[24:], opts.Options)
binary.BigEndian.PutUint16(extraBuf[28:], uint16(len(opts.Extra)))
copy(extraBuf[30:], opts.Extra)
req := &memdQRequest{
memdPacket: memdPacket{
Magic: reqMagic,
Opcode: cmdSetMeta,
Datatype: opts.Datatype,
Cas: 0,
Extras: extraBuf,
Key: opts.Key,
Value: opts.Value,
},
Callback: handler,
RootTraceContext: tracer.RootContext(),
}
return agent.dispatchOp(req)
}
// DeleteMetaOptions encapsulates the parameters for a DeleteMetaEx operation.
type DeleteMetaOptions struct {
Key []byte
Value []byte
Extra []byte
Datatype uint8
Options uint32
Flags uint32
Expiry uint32
Cas Cas
RevNo uint64
TraceContext opentracing.SpanContext
}
// DeleteMetaResult encapsulates the result of a DeleteMetaEx operation.
type DeleteMetaResult struct {
Cas Cas
MutationToken MutationToken
}
// DeleteMetaExCallback is invoked upon completion of a DeleteMetaEx operation.
type DeleteMetaExCallback func(*DeleteMetaResult, error)
// DeleteMetaEx deletes a document along with setting some internal Couchbase meta-data.
func (agent *Agent) DeleteMetaEx(opts DeleteMetaOptions, cb DeleteMetaExCallback) (PendingOp, error) {
tracer := agent.createOpTrace("GetMetaEx", nil)
handler := func(resp *memdQResponse, req *memdQRequest, err error) {
if err != nil {
tracer.Finish()
cb(nil, err)
return
}
mutToken := MutationToken{}
if len(resp.Extras) >= 16 {
mutToken.VbId = req.Vbucket
mutToken.VbUuid = VbUuid(binary.BigEndian.Uint64(resp.Extras[0:]))
mutToken.SeqNo = SeqNo(binary.BigEndian.Uint64(resp.Extras[8:]))
}
tracer.Finish()
cb(&DeleteMetaResult{
Cas: Cas(resp.Cas),
MutationToken: mutToken,
}, nil)
}
extraBuf := make([]byte, 30+len(opts.Extra))
binary.BigEndian.PutUint32(extraBuf[0:], opts.Flags)
binary.BigEndian.PutUint32(extraBuf[4:], opts.Expiry)
binary.BigEndian.PutUint64(extraBuf[8:], opts.RevNo)
binary.BigEndian.PutUint64(extraBuf[16:], uint64(opts.Cas))
binary.BigEndian.PutUint32(extraBuf[24:], opts.Options)
binary.BigEndian.PutUint16(extraBuf[28:], uint16(len(opts.Extra)))
copy(extraBuf[30:], opts.Extra)
req := &memdQRequest{
memdPacket: memdPacket{
Magic: reqMagic,
Opcode: cmdDelMeta,
Datatype: opts.Datatype,
Cas: 0,
Extras: extraBuf,
Key: opts.Key,
Value: opts.Value,
},
Callback: handler,
RootTraceContext: tracer.RootContext(),
}
return agent.dispatchOp(req)
}