-
Notifications
You must be signed in to change notification settings - Fork 451
/
types.go
447 lines (355 loc) · 15.7 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package encoding
import (
"io"
"time"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"
)
// Encoder is the generic interface for different types of encoders.
type Encoder interface {
// SetSchema sets up the schema needed by schema-aware encoder to encode the stream.
// SetSchema can be called multiple times between reset for mid-stream schema changes.
SetSchema(descr namespace.SchemaDescr)
// Encode encodes a datapoint and optionally an annotation.
// Schema must be set prior to Encode for schema-aware encoder. A schema can be set
// via Reset/DiscardReset/SetSchema.
Encode(dp ts.Datapoint, unit xtime.Unit, annotation ts.Annotation) error
// Stream is the streaming interface for reading encoded bytes in the encoder.
// A boolean is returned indicating whether the returned xio.SegmentReader contains
// any data (true) or is empty (false) to encourage callers to remember to handle
// the special case where there is an empty stream.
// NB(r): The underlying byte slice will not be returned to the pool until the context
// passed to this method is closed, so to avoid not returning the
// encoder's buffer back to the pool when it is completed be sure to call
// close on the context eventually.
Stream(ctx context.Context) (xio.SegmentReader, bool)
// NumEncoded returns the number of encoded datapoints.
NumEncoded() int
// LastEncoded returns the last encoded datapoint, useful for
// de-duplicating encoded values. If there are no previously encoded values
// an error is returned.
LastEncoded() (ts.Datapoint, error)
// Len returns the length of the encoded stream as returned by a call to Stream().
Len() int
// Reset resets the start time of the encoder and the internal state.
// Reset sets up the schema for schema-aware encoders such as proto encoders.
Reset(t time.Time, capacity int, schema namespace.SchemaDescr)
// Close closes the encoder and if pooled will return to the pool.
Close()
// Discard will take ownership of the encoder data and if pooled will return to the pool.
Discard() ts.Segment
// DiscardReset will take ownership of the encoder data and reset the encoder for use.
// DiscardReset sets up the schema for schema-aware encoders such as proto encoders.
DiscardReset(t time.Time, capacity int, schema namespace.SchemaDescr) ts.Segment
}
// NewEncoderFn creates a new encoder
type NewEncoderFn func(start time.Time, bytes []byte) Encoder
// Options represents different options for encoding time as well as markers.
type Options interface {
// SetDefaultTimeUnit sets the default time unit for the encoder.
SetDefaultTimeUnit(tu xtime.Unit) Options
// DefaultTimeUnit returns the default time unit for the encoder.
DefaultTimeUnit() xtime.Unit
// SetTimeEncodingSchemes sets the time encoding schemes for different time units.
SetTimeEncodingSchemes(value map[xtime.Unit]TimeEncodingScheme) Options
// TimeEncodingSchemes returns the time encoding schemes for different time units.
TimeEncodingSchemes() TimeEncodingSchemes
// SetMarkerEncodingScheme sets the marker encoding scheme.
SetMarkerEncodingScheme(value MarkerEncodingScheme) Options
// MarkerEncodingScheme returns the marker encoding scheme.
MarkerEncodingScheme() MarkerEncodingScheme
// SetEncoderPool sets the encoder pool.
SetEncoderPool(value EncoderPool) Options
// EncoderPool returns the encoder pool.
EncoderPool() EncoderPool
// SetReaderIteratorPool sets the ReaderIteratorPool.
SetReaderIteratorPool(value ReaderIteratorPool) Options
// ReaderIteratorPool returns the ReaderIteratorPool.
ReaderIteratorPool() ReaderIteratorPool
// SetBytesPool sets the bytes pool.
SetBytesPool(value pool.CheckedBytesPool) Options
// BytesPool returns the bytes pool.
BytesPool() pool.CheckedBytesPool
// SetSegmentReaderPool sets the segment reader pool.
SetSegmentReaderPool(value xio.SegmentReaderPool) Options
// SegmentReaderPool returns the segment reader pool.
SegmentReaderPool() xio.SegmentReaderPool
// SetCheckedBytesWrapperPool sets the checked bytes wrapper pool.
SetCheckedBytesWrapperPool(value xpool.CheckedBytesWrapperPool) Options
// CheckedBytesWrapperPool returns the checked bytes wrapper pool.
CheckedBytesWrapperPool() xpool.CheckedBytesWrapperPool
// SetByteFieldDictionaryLRUSize sets theByteFieldDictionaryLRUSize which controls
// how many recently seen byte field values will be maintained in the compression
// dictionaries LRU when compressing / decompressing byte fields in ProtoBuf messages.
// Increasing this value can potentially lead to better compression at the cost of
// using more memory for storing metadata when compressing / decompressing.
SetByteFieldDictionaryLRUSize(value int) Options
// ByteFieldDictionaryLRUSize returns the ByteFieldDictionaryLRUSize.
ByteFieldDictionaryLRUSize() int
// SetIStreamReaderSizeM3TSZ sets the istream bufio reader size
// for m3tsz encoding iteration.
SetIStreamReaderSizeM3TSZ(value int) Options
// IStreamReaderSizeM3TSZ returns the istream bufio reader size
// for m3tsz encoding iteration.
IStreamReaderSizeM3TSZ() int
// SetIStreamReaderSizeProto sets the istream bufio reader size
// for proto encoding iteration.
SetIStreamReaderSizeProto(value int) Options
// SetIStreamReaderSizeProto returns the istream bufio reader size
// for proto encoding iteration.
IStreamReaderSizeProto() int
}
// Iterator is the generic interface for iterating over encoded data.
type Iterator interface {
// Next moves to the next item.
Next() bool
// Current returns the value as well as the annotation associated with the
// current datapoint. Users should not hold on to the returned Annotation
// object as it may get invalidated when the iterator calls Next().
Current() (ts.Datapoint, xtime.Unit, ts.Annotation)
// Err returns the error encountered
Err() error
// Close closes the iterator and if pooled will return to the pool.
Close()
}
// ReaderIterator is the interface for a single-reader iterator.
type ReaderIterator interface {
Iterator
// Reset resets the iterator to read from a new reader with
// a new schema (for schema aware iterators).
Reset(reader io.Reader, schema namespace.SchemaDescr)
}
// MultiReaderIterator is an iterator that iterates in order over
// a list of sets of internally ordered but not collectively in order
// readers, it also deduplicates datapoints.
type MultiReaderIterator interface {
Iterator
// Reset resets the iterator to read from a slice of readers
// with a new schema (for schema aware iterators).
Reset(readers []xio.SegmentReader, start time.Time,
blockSize time.Duration, schema namespace.SchemaDescr)
// Reset resets the iterator to read from a slice of slice readers
// with a new schema (for schema aware iterators).
ResetSliceOfSlices(
readers xio.ReaderSliceOfSlicesIterator,
schema namespace.SchemaDescr,
)
// Readers exposes the underlying ReaderSliceOfSlicesIterator
// for this MultiReaderIterator.
Readers() xio.ReaderSliceOfSlicesIterator
// Schema exposes the underlying SchemaDescr for this MutliReaderIterator.
Schema() namespace.SchemaDescr
}
// SeriesIterator is an iterator that iterates over a set of iterators from
// different replicas and de-dupes & merges results from the replicas for a
// given series while also applying a time filter on top of the values in
// case replicas returned values out of range on either end.
type SeriesIterator interface {
Iterator
// ID gets the ID of the series.
ID() ident.ID
// Namespace gets the namespace of the series.
Namespace() ident.ID
// Tags returns an iterator over the tags associated with the ID.
Tags() ident.TagIterator
// Start returns the start time filter specified for the iterator.
Start() time.Time
// End returns the end time filter specified for the iterator.
End() time.Time
// Reset resets the iterator to read from a set of iterators from different
// replicas, one must note that this can be an array with nil entries if
// some replicas did not return successfully.
// NB: the SeriesIterator assumes ownership of the provided ids, this
// includes calling `id.Finalize()` upon iter.Close().
Reset(opts SeriesIteratorOptions)
// SetIterateEqualTimestampStrategy sets the equal timestamp strategy of how
// to select a value when the timestamp matches differing values with the same
// timestamp from different replicas.
// It can be set at any time and will apply to the current value returned
// from the iterator immediately.
SetIterateEqualTimestampStrategy(strategy IterateEqualTimestampStrategy)
// Replicas exposes the underlying MultiReaderIterator slice
// for this SeriesIterator.
Replicas() []MultiReaderIterator
// Stats provides information for this SeriesIterator.
Stats() (SeriesIteratorStats, error)
}
// SeriesIteratorStats contains information about a SeriesIterator.
type SeriesIteratorStats struct {
// ApproximateSizeInBytes approximates how much data is contained within the
// SeriesIterator, in bytes.
ApproximateSizeInBytes int
}
// SeriesIteratorConsolidator optionally defines methods to consolidate newly
// reset series iterators.
type SeriesIteratorConsolidator interface {
// ConsolidateReplicas consolidates MultiReaderIterator slices.
ConsolidateReplicas([]MultiReaderIterator) ([]MultiReaderIterator, error)
}
// SeriesIteratorOptions is a set of options for using a series iterator.
type SeriesIteratorOptions struct {
ID ident.ID
Namespace ident.ID
Tags ident.TagIterator
Replicas []MultiReaderIterator
StartInclusive xtime.UnixNano
EndExclusive xtime.UnixNano
IterateEqualTimestampStrategy IterateEqualTimestampStrategy
SeriesIteratorConsolidator SeriesIteratorConsolidator
}
// SeriesIterators is a collection of SeriesIterator that can
// close all iterators.
type SeriesIterators interface {
// Iters returns the array of series iterators.
Iters() []SeriesIterator
// Len returns the count of iterators in the collection.
Len() int
// Close closes all iterators contained within the collection.
Close()
}
// MutableSeriesIterators is a mutable SeriesIterators.
type MutableSeriesIterators interface {
SeriesIterators
// Reset the iters collection to a size for reuse.
Reset(size int)
// Cap returns the capacity of the iters.
Cap() int
// SetAt sets a SeriesIterator to the given index.
SetAt(idx int, iter SeriesIterator)
}
// Decoder is the generic interface for different types of decoders.
type Decoder interface {
// Decode decodes the encoded data in the reader.
Decode(reader io.Reader) ReaderIterator
}
// NewDecoderFn creates a new decoder.
type NewDecoderFn func() Decoder
// EncoderAllocate allocates an encoder for a pool.
type EncoderAllocate func() Encoder
// ReaderIteratorAllocate allocates a ReaderIterator for a pool.
type ReaderIteratorAllocate func(reader io.Reader, descr namespace.SchemaDescr) ReaderIterator
// IStream encapsulates a readable stream.
type IStream interface {
Read([]byte) (int, error)
ReadBit() (Bit, error)
ReadByte() (byte, error)
ReadBits(numBits uint) (uint64, error)
PeekBits(numBits uint) (uint64, error)
RemainingBitsInCurrentByte() uint
Reset(r io.Reader)
}
// OStream encapsulates a writable stream.
type OStream interface {
Len() int
Empty() bool
WriteBit(v Bit)
WriteBits(v uint64, numBits int)
WriteByte(v byte)
WriteBytes(bytes []byte)
Write(bytes []byte) (int, error)
Reset(buffer checked.Bytes)
Discard() checked.Bytes
Rawbytes() ([]byte, int) // TODO: rename this RawBytes
CheckedBytes() (checked.Bytes, int)
}
// EncoderPool provides a pool for encoders.
type EncoderPool interface {
// Init initializes the pool.
Init(alloc EncoderAllocate)
// Get provides an encoder from the pool.
Get() Encoder
// Put returns an encoder to the pool.
Put(e Encoder)
}
// ReaderIteratorPool provides a pool for ReaderIterators.
type ReaderIteratorPool interface {
// Init initializes the pool.
Init(alloc ReaderIteratorAllocate)
// Get provides a ReaderIterator from the pool.
Get() ReaderIterator
// Put returns a ReaderIterator to the pool.
Put(iter ReaderIterator)
}
// MultiReaderIteratorPool provides a pool for MultiReaderIterators.
type MultiReaderIteratorPool interface {
// Init initializes the pool.
Init(alloc ReaderIteratorAllocate)
// Get provides a MultiReaderIterator from the pool.
Get() MultiReaderIterator
// Put returns a MultiReaderIterator to the pool.
Put(iter MultiReaderIterator)
}
// SeriesIteratorPool provides a pool for SeriesIterator.
type SeriesIteratorPool interface {
// Init initializes the pool.
Init()
// Get provides a SeriesIterator from the pool.
Get() SeriesIterator
// Put returns a SeriesIterator to the pool.
Put(iter SeriesIterator)
}
// MutableSeriesIteratorsPool provides a pool for MutableSeriesIterators.
type MutableSeriesIteratorsPool interface {
// Init initializes the pool.
Init()
// Get provides a MutableSeriesIterators from the pool.
Get(size int) MutableSeriesIterators
// Put returns a MutableSeriesIterators to the pool.
Put(iters MutableSeriesIterators)
}
// MultiReaderIteratorArrayPool provides a pool for MultiReaderIterator arrays.
type MultiReaderIteratorArrayPool interface {
// Init initializes the pool.
Init()
// Get provides a MultiReaderIterator array from the pool.
Get(size int) []MultiReaderIterator
// Put returns a MultiReaderIterator array to the pool.
Put(iters []MultiReaderIterator)
}
// IteratorPools exposes a small subset of iterator pools that are sufficient
// for clients to rebuild SeriesIterator.
type IteratorPools interface {
// MultiReaderIteratorArray exposes the session MultiReaderIteratorArrayPool.
MultiReaderIteratorArray() MultiReaderIteratorArrayPool
// MultiReaderIterator exposes the session MultiReaderIteratorPool.
MultiReaderIterator() MultiReaderIteratorPool
// MutableSeriesIterators exposes the session MutableSeriesIteratorsPool.
MutableSeriesIterators() MutableSeriesIteratorsPool
// SeriesIterator exposes the session SeriesIteratorPool.
SeriesIterator() SeriesIteratorPool
// CheckedBytesWrapper exposes the session CheckedBytesWrapperPool.
CheckedBytesWrapper() xpool.CheckedBytesWrapperPool
// ID exposes the session identity pool.
ID() ident.Pool
// TagEncoder exposes the session tag encoder pool.
TagEncoder() serialize.TagEncoderPool
// TagDecoder exposes the session tag decoder pool.
TagDecoder() serialize.TagDecoderPool
}