This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 230
/
handler.go
336 lines (290 loc) · 9.42 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// Copyright 2017 Pilosa Corp.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pilosa
import (
"encoding/json"
"github.com/pilosa/pilosa/v2/tracing"
"github.com/pkg/errors"
)
// QueryRequest represent a request to process a query.
type QueryRequest struct {
// Index to execute query against.
Index string
// The query string to parse and execute.
Query string
// The shards to include in the query execution.
// If empty, all shards are included.
Shards []uint64
// Return column attributes, if true.
ColumnAttrs bool
// Do not return row attributes, if true.
ExcludeRowAttrs bool
// Do not return columns, if true.
ExcludeColumns bool
// If true, indicates that query is part of a larger distributed query.
// If false, this request is on the originating node.
Remote bool
// Should we profile this query?
Profile bool
// Additional data associated with the query, in cases where there's
// row-style inputs for precomputed values.
EmbeddedData []*Row
}
// QueryResponse represent a response from a processed query.
type QueryResponse struct {
// Result for each top-level query call.
// The result type differs depending on the query; types
// include: Row, RowIdentifiers, GroupCounts, SignedRow,
// ValCount, Pair, Pairs, bool, uint64.
Results []interface{}
// Set of column attribute objects matching IDs returned in Result.
ColumnAttrSets []*ColumnAttrSet
// Error during parsing or execution.
Err error
// Profiling data, if any
Profile *tracing.Profile
}
// MarshalJSON marshals QueryResponse into a JSON-encoded byte slice
func (resp *QueryResponse) MarshalJSON() ([]byte, error) {
if resp.Err != nil {
return json.Marshal(struct {
Err string `json:"error"`
}{Err: resp.Err.Error()})
}
return json.Marshal(struct {
Results []interface{} `json:"results"`
ColumnAttrSets []*ColumnAttrSet `json:"columnAttrs,omitempty"`
Profile *tracing.Profile `json:"profile,omitempty"`
}{
Results: resp.Results,
ColumnAttrSets: resp.ColumnAttrSets,
Profile: resp.Profile,
})
}
// Handler is the interface for the data handler, a wrapper around
// Pilosa's data store.
type Handler interface {
Serve() error
Close() error
}
type nopHandler struct{}
func (n nopHandler) Serve() error {
return nil
}
func (n nopHandler) Close() error {
return nil
}
// NopHandler is a no-op implementation of the Handler interface.
var NopHandler Handler = nopHandler{}
// ImportValueRequest describes the import request structure
// for a value (BSI) import.
// Note: no RowIDs here. have to convert BSI Values into RowIDs internally.
type ImportValueRequest struct {
Index string
IndexCreatedAt int64
Field string
FieldCreatedAt int64
// if Shard is MaxUint64 (an impossible shard value), this
// indicates that the column IDs may come from multiple shards.
Shard uint64
ColumnIDs []uint64 // e.g. weather stationID
ColumnKeys []string
Values []int64 // e.g. temperature, humidity, barometric pressure
FloatValues []float64
StringValues []string
Clear bool
}
// AtomicRecord applies all its Ivr and Ivr atomically, in a Tx.
// The top level Shard has to agree with Ivr[i].Shard and the Iv[i].Shard
// for all i included (in Ivr and Ir). The same goes for the top level Index: all records
// have to be writes to the same Index. These requirements are checked.
//
type AtomicRecord struct {
Index string
Shard uint64
Ivr []*ImportValueRequest // BSI values
Ir []*ImportRequest // other field types, e.g. single bit
}
func (ivr *ImportValueRequest) Len() int { return len(ivr.ColumnIDs) }
func (ivr *ImportValueRequest) Less(i, j int) bool { return ivr.ColumnIDs[i] < ivr.ColumnIDs[j] }
func (ivr *ImportValueRequest) Swap(i, j int) {
ivr.ColumnIDs[i], ivr.ColumnIDs[j] = ivr.ColumnIDs[j], ivr.ColumnIDs[i]
if len(ivr.Values) > 0 {
ivr.Values[i], ivr.Values[j] = ivr.Values[j], ivr.Values[i]
} else if len(ivr.FloatValues) > 0 {
ivr.FloatValues[i], ivr.FloatValues[j] = ivr.FloatValues[j], ivr.FloatValues[i]
} else if len(ivr.StringValues) > 0 {
ivr.StringValues[i], ivr.StringValues[j] = ivr.StringValues[j], ivr.StringValues[i]
}
}
// Validate ensures that the payload of the request is valid.
func (ivr *ImportValueRequest) Validate() error {
return ivr.ValidateWithTimestamp(ivr.IndexCreatedAt, ivr.FieldCreatedAt)
}
// ValidateWithTimestamp ensures that the payload of the request is valid.
func (ivr *ImportValueRequest) ValidateWithTimestamp(indexCreatedAt, fieldCreatedAt int64) error {
if ivr.Index == "" || ivr.Field == "" {
return errors.Errorf("index and field required, but got '%s' and '%s'", ivr.Index, ivr.Field)
}
if len(ivr.ColumnIDs) != 0 && len(ivr.ColumnKeys) != 0 {
return errors.Errorf("must pass either column ids or keys, but not both")
}
var valueSetCount int
if len(ivr.Values) != 0 {
valueSetCount++
}
if len(ivr.FloatValues) != 0 {
valueSetCount++
}
if len(ivr.StringValues) != 0 {
valueSetCount++
}
if valueSetCount > 1 {
return errors.Errorf("must pass ints, floats, or strings but not multiple")
}
if ivr.IndexCreatedAt != 0 && ivr.FieldCreatedAt != 0 {
if ivr.IndexCreatedAt != indexCreatedAt || ivr.FieldCreatedAt != fieldCreatedAt {
return ErrPreconditionFailed
}
}
return nil
}
// ImportColumnAttrsRequest describes the import request structure
// for a ColumnAttr import.
type ImportColumnAttrsRequest struct {
AttrKey string
ColumnIDs []uint64
AttrVals []string
Shard int64
Index string
IndexCreatedAt int64
}
// ImportRequest describes the import request structure
// for an import. BSIs use the ImportValueRequest instead.
type ImportRequest struct {
Index string
IndexCreatedAt int64
Field string
FieldCreatedAt int64
Shard uint64
RowIDs []uint64
ColumnIDs []uint64
RowKeys []string
ColumnKeys []string
Timestamps []int64
Clear bool
}
// ValidateWithTimestamp ensures that the payload of the request is valid.
func (ir *ImportRequest) ValidateWithTimestamp(indexCreatedAt, fieldCreatedAt int64) error {
if ir.IndexCreatedAt != 0 && ir.FieldCreatedAt != 0 {
if ir.IndexCreatedAt != indexCreatedAt || ir.FieldCreatedAt != fieldCreatedAt {
return ErrPreconditionFailed
}
}
return nil
}
const (
RequestActionSet = "set"
RequestActionClear = "clear"
RequestActionOverwrite = "overwrite"
)
// ImportRoaringRequest describes the import request structure
// for an import containing roaring-encoded data.
type ImportRoaringRequest struct {
IndexCreatedAt int64
FieldCreatedAt int64
Clear bool
Action string // [set, clear, overwrite]
Block int
Views map[string][]byte
}
// ValidateWithTimestamp ensures that the payload of the request is valid.
func (irr *ImportRoaringRequest) ValidateWithTimestamp(indexCreatedAt, fieldCreatedAt int64) error {
if irr.IndexCreatedAt != 0 && irr.FieldCreatedAt != 0 {
if irr.IndexCreatedAt != indexCreatedAt || irr.FieldCreatedAt != fieldCreatedAt {
return ErrPreconditionFailed
}
}
return nil
}
// ImportResponse is the structured response of an import.
type ImportResponse struct {
Err string
}
// BlockDataRequest describes the structure of a request
// for fragment block data.
type BlockDataRequest struct {
Index string
Field string
View string
Shard uint64
Block uint64
}
// BlockDataResponse is the structured response of a block
// data request.
type BlockDataResponse struct {
RowIDs []uint64
ColumnIDs []uint64
}
// TranslateKeysRequest describes the structure of a request
// for a batch of key translations.
type TranslateKeysRequest struct {
Index string
Field string
Keys []string
// it's a awkward name, just to keep backward compatibility with go-pilosa and idk.
NotWritable bool
}
// TranslateKeysResponse is the structured response of a key
// translation request.
type TranslateKeysResponse struct {
IDs []uint64
}
// TranslateIDsRequest describes the structure of a request
// for a batch of id translations.
type TranslateIDsRequest struct {
Index string
Field string
IDs []uint64
}
// TranslateIDsResponse is the structured response of a id
// translation request.
type TranslateIDsResponse struct {
Keys []string
}
// InspectRequestParams represents the parts of an InspectRequest that
// aren't generic holder filtering attributes.
type InspectRequestParams struct {
Containers bool // include container details
Checksum bool // perform checksums
}
// InspectRequest represents a request for a possibly-partial
// holder inspection, using a provided holder filter and inspect-specific
// parameters.
type InspectRequest struct {
HolderFilterParams
InspectRequestParams
}
// InspectResponse contains the structured results for an InspectRequest.
// It may some day be expanded to include metadata about views or indexes.
type InspectResponse struct {
Fragments []struct {
Index string
Field string
View string
Shard int64
Path string
Info *FragmentInfo
}
}