-
Notifications
You must be signed in to change notification settings - Fork 450
/
types.go
1539 lines (1208 loc) · 49.9 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package storage
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/generated/proto/annotation"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/storage/limits/permits"
"github.com/m3db/m3/src/dbnode/storage/repair"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/mmap"
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"
)
// PageToken is an opaque paging token.
type PageToken []byte
// IndexedErrorHandler can handle individual errors based on their index. It
// is used primarily in cases where we need to handle errors in batches, but
// want to avoid an intermediary allocation of []error.
type IndexedErrorHandler interface {
HandleError(index int, err error)
}
// IDBatchProcessor is a function that processes a batch.
type IDBatchProcessor func(batch *ident.IDBatch) error
// Database is a time series database.
type Database interface {
// Options returns the database options.
Options() Options
// AssignShardSet sets the shard set assignment and returns immediately.
AssignShardSet(shardSet sharding.ShardSet)
// Namespaces returns the namespaces.
Namespaces() []Namespace
// Namespace returns the specified namespace.
Namespace(ns ident.ID) (Namespace, bool)
// Open will open the database for writing and reading.
Open() error
// Close will close the database for writing and reading. Close releases
// release resources held by owned namespaces.
Close() error
// ShardSet returns the set of shards currently associated with
// this namespace.
ShardSet() sharding.ShardSet
// Terminate will close the database for writing and reading. Terminate does
// NOT release any resources held by owned namespaces, instead relying upon
// the GC to do so.
Terminate() error
// Write value to the database for an ID.
Write(
ctx context.Context,
namespace ident.ID,
id ident.ID,
timestamp xtime.UnixNano,
value float64,
unit xtime.Unit,
annotation []byte,
) error
// WriteTagged values to the database for an ID.
WriteTagged(
ctx context.Context,
namespace ident.ID,
id ident.ID,
tagResolver convert.TagMetadataResolver,
timestamp xtime.UnixNano,
value float64,
unit xtime.Unit,
annotation []byte,
) error
// BatchWriter returns a batch writer for the provided namespace that can
// be used to issue a batch of writes to either WriteBatch
// or WriteTaggedBatch.
//
// Note that when using the BatchWriter the caller owns the lifecycle of the series
// IDs if they're being pooled its the callers responsibility to return them to the
// appropriate pool, but the encoded tags and annotations are owned by the
// writes.WriteBatch itself and will be finalized when the entire writes.WriteBatch is finalized
// due to their lifecycle being more complicated.
// Callers can still control the pooling of the encoded tags and annotations by using
// the SetFinalizeEncodedTagsFn and SetFinalizeAnnotationFn on the WriteBatch itself.
BatchWriter(namespace ident.ID, batchSize int) (writes.BatchWriter, error)
// WriteBatch is the same as Write, but in batch.
WriteBatch(
ctx context.Context,
namespace ident.ID,
writes writes.BatchWriter,
errHandler IndexedErrorHandler,
) error
// WriteTaggedBatch is the same as WriteTagged, but in batch.
WriteTaggedBatch(
ctx context.Context,
namespace ident.ID,
writes writes.BatchWriter,
errHandler IndexedErrorHandler,
) error
// QueryIDs resolves the given query into known IDs.
QueryIDs(
ctx context.Context,
namespace ident.ID,
query index.Query,
opts index.QueryOptions,
) (index.QueryResult, error)
// AggregateQuery resolves the given query into aggregated tags.
AggregateQuery(
ctx context.Context,
namespace ident.ID,
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error)
// ReadEncoded retrieves encoded segments for an ID.
ReadEncoded(
ctx context.Context,
namespace ident.ID,
id ident.ID,
start, end xtime.UnixNano,
) (series.BlockReaderIter, error)
// WideQuery performs a wide blockwise query that provides batched results
// that can exceed query limits.
WideQuery(
ctx context.Context,
namespace ident.ID,
query index.Query,
start xtime.UnixNano,
shards []uint32,
iterOpts index.IterationOptions,
) ([]xio.WideEntry, error) // FIXME: change when exact type known.
// BatchProcessWideQuery runs the given query against the namespace index,
// iterating in a batchwise fashion across all matching IDs, applying the given
// IDBatchProcessor batch processing function to each ID discovered.
BatchProcessWideQuery(
ctx context.Context,
n Namespace,
query index.Query,
batchProcessor IDBatchProcessor,
opts index.WideQueryOptions,
) error
// FetchBlocks retrieves data blocks for a given id and a list of block
// start times.
FetchBlocks(
ctx context.Context,
namespace ident.ID,
shard uint32,
id ident.ID,
starts []xtime.UnixNano,
) ([]block.FetchBlockResult, error)
// FetchBlocksMetadataV2 retrieves blocks metadata for a given shard, returns the
// fetched block metadata results, the next page token, and any error encountered.
// If we have fetched all the block metadata, we return nil as the next page token.
FetchBlocksMetadataV2(
ctx context.Context,
namespace ident.ID,
shard uint32,
start, end xtime.UnixNano,
limit int64,
pageToken PageToken,
opts block.FetchBlocksMetadataOptions,
) (block.FetchBlocksMetadataResults, PageToken, error)
// Bootstrap bootstraps the database.
Bootstrap() error
// IsBootstrapped determines whether the database is bootstrapped.
IsBootstrapped() bool
// IsBootstrappedAndDurable determines whether the database is bootstrapped
// and durable, meaning that it could recover all data in memory using only
// the local disk.
IsBootstrappedAndDurable() bool
// IsOverloaded determines whether the database is overloaded.
IsOverloaded() bool
// Repair will issue a repair and return nil on success or error on error.
Repair() error
// Truncate truncates data for the given namespace.
Truncate(namespace ident.ID) (int64, error)
// BootstrapState captures and returns a snapshot of the databases'
// bootstrap state.
BootstrapState() DatabaseBootstrapState
// FlushState returns the flush state for the specified shard and block start.
FlushState(namespace ident.ID, shardID uint32, blockStart xtime.UnixNano) (fileOpState, error)
// AggregateTiles does large tile aggregation from source namespace to target namespace.
AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error)
}
// database is the internal database interface.
type database interface {
Database
// OwnedNamespaces returns the namespaces this database owns.
OwnedNamespaces() ([]databaseNamespace, error)
// UpdateOwnedNamespaces updates the namespaces this database owns.
UpdateOwnedNamespaces(namespaces namespace.Map) error
}
// Namespace is a time series database namespace.
type Namespace interface {
// Options returns the namespace options.
Options() namespace.Options
// ID returns the ID of the namespace.
ID() ident.ID
// Metadata returns the metadata of the namespace.
Metadata() namespace.Metadata
// Schema returns the schema of the namespace.
Schema() namespace.SchemaDescr
// NumSeries returns the number of series in the namespace.
NumSeries() int64
// Shards returns the shard description.
Shards() []Shard
// ReadableShardAt returns a readable (bootstrapped) shard by id.
ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error)
// SetIndex sets and enables reverse index for this namespace.
SetIndex(reverseIndex NamespaceIndex) error
// Index returns the reverse index backing the namespace, if it exists.
Index() (NamespaceIndex, error)
// StorageOptions returns storage options.
StorageOptions() Options
// ReadOnly returns true if this Namespace is read only.
ReadOnly() bool
// SetReadOnly sets the value of ReadOnly option.
SetReadOnly(value bool)
// DocRef returns the doc if already present in a namespace shard.
DocRef(id ident.ID) (doc.Metadata, bool, error)
// WideQueryIDs resolves the given query into known IDs in s streaming
// fashion.
WideQueryIDs(
ctx context.Context,
query index.Query,
collector chan *ident.IDBatch,
opts index.WideQueryOptions,
) error
// FetchWideEntry retrieves the wide entry for an ID for the
// block at time start.
FetchWideEntry(
ctx context.Context,
id ident.ID,
blockStart xtime.UnixNano,
filter schema.WideEntryFilter,
) (block.StreamedWideEntry, error)
}
// NamespacesByID is a sortable slice of namespaces by ID.
type NamespacesByID []Namespace
func (n NamespacesByID) Len() int { return len(n) }
func (n NamespacesByID) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
func (n NamespacesByID) Less(i, j int) bool {
return bytes.Compare(n[i].ID().Bytes(), n[j].ID().Bytes()) < 0
}
// SeriesWrite is a result of a series write.
type SeriesWrite struct {
Series ts.Series
WasWritten bool
NeedsIndex bool
PendingIndexInsert writes.PendingIndexInsert
}
type databaseNamespace interface {
Namespace
// Close will release the namespace resources and close the namespace.
Close() error
// AssignShardSet sets the shard set assignment and returns immediately.
AssignShardSet(shardSet sharding.ShardSet)
// OwnedShards returns the database shards.
OwnedShards() []databaseShard
// Tick performs any regular maintenance operations.
Tick(c context.Cancellable, startTime xtime.UnixNano) error
// Write writes a data point.
Write(
ctx context.Context,
id ident.ID,
timestamp xtime.UnixNano,
value float64,
unit xtime.Unit,
annotation []byte,
) (SeriesWrite, error)
// WriteTagged values to the namespace for an ID.
WriteTagged(
ctx context.Context,
id ident.ID,
tagResolver convert.TagMetadataResolver,
timestamp xtime.UnixNano,
value float64,
unit xtime.Unit,
annotation []byte,
) (SeriesWrite, error)
// QueryIDs resolves the given query into known IDs.
QueryIDs(
ctx context.Context,
query index.Query,
opts index.QueryOptions,
) (index.QueryResult, error)
// AggregateQuery resolves the given query into aggregated tags.
AggregateQuery(
ctx context.Context,
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error)
// ReadEncoded reads data for given id within [start, end).
ReadEncoded(
ctx context.Context,
id ident.ID,
start, end xtime.UnixNano,
) (series.BlockReaderIter, error)
// FetchBlocks retrieves data blocks for a given id and a list of block
// start times.
FetchBlocks(
ctx context.Context,
shardID uint32,
id ident.ID,
starts []xtime.UnixNano,
) ([]block.FetchBlockResult, error)
// FetchBlocksMetadataV2 retrieves blocks metadata.
FetchBlocksMetadataV2(
ctx context.Context,
shardID uint32,
start, end xtime.UnixNano,
limit int64,
pageToken PageToken,
opts block.FetchBlocksMetadataOptions,
) (block.FetchBlocksMetadataResults, PageToken, error)
// PrepareBootstrap prepares the namespace for bootstrapping by ensuring
// it's shards know which flushed files reside on disk, so that calls
// to series.LoadBlock(...) will succeed.
PrepareBootstrap(ctx context.Context) ([]databaseShard, error)
// Bootstrap marks shards as bootstrapped for the namespace.
Bootstrap(ctx context.Context, bootstrapResult bootstrap.NamespaceResult) error
// WarmFlush flushes in-memory WarmWrites.
WarmFlush(blockStart xtime.UnixNano, flush persist.FlushPreparer) error
// FlushIndex flushes in-memory index data.
FlushIndex(
flush persist.IndexFlush,
) error
// ColdFlush flushes unflushed in-memory ColdWrites.
ColdFlush(
flush persist.FlushPreparer,
) error
// Snapshot snapshots unflushed in-memory WarmWrites.
Snapshot(blockStart, snapshotTime xtime.UnixNano, flush persist.SnapshotPreparer) error
// NeedsFlush returns true if the namespace needs a flush for the
// period: [start, end] (both inclusive).
// NB: The start/end times are assumed to be aligned to block size boundary.
NeedsFlush(alignedInclusiveStart xtime.UnixNano, alignedInclusiveEnd xtime.UnixNano) (bool, error)
// Truncate truncates the in-memory data for this namespace.
Truncate() (int64, error)
// Repair repairs the namespace data for a given time range.
Repair(repairer databaseShardRepairer, tr xtime.Range, opts NamespaceRepairOptions) error
// BootstrapState returns namespaces' bootstrap state.
BootstrapState() BootstrapState
// ShardBootstrapState captures and returns a snapshot of the namespaces' shards bootstrap state.
ShardBootstrapState() ShardBootstrapStates
// FlushState returns the flush state for the specified shard and block start.
FlushState(shardID uint32, blockStart xtime.UnixNano) (fileOpState, error)
// SeriesRefResolver returns a series ref resolver, callers
// must make sure to call the release callback once finished
// with the reference.
SeriesRefResolver(
shardID uint32,
id ident.ID,
tags ident.TagIterator,
) (result bootstrap.SeriesRefResolver, owned bool, err error)
// WritePendingIndexInserts will write any pending index inserts.
WritePendingIndexInserts(pending []writes.PendingIndexInsert) error
// AggregateTiles does large tile aggregation from source namespace into this namespace.
AggregateTiles(
ctx context.Context,
sourceNs databaseNamespace,
opts AggregateTilesOptions,
) (int64, error)
}
// NamespaceRepairOptions is a set of repair options for repairing a namespace.
type NamespaceRepairOptions struct {
Force bool
}
// Shard is a time series database shard.
type Shard interface {
// ID returns the ID of the shard.
ID() uint32
// NumSeries returns the number of series in the shard.
NumSeries() int64
// IsBootstrapped returns whether the shard is already bootstrapped.
IsBootstrapped() bool
// BootstrapState returns the shards' bootstrap state.
BootstrapState() BootstrapState
// OpenStreamingReader creates and opens a streaming fs.DataFileSetReader
// on the latest volume of the given block.
OpenStreamingReader(blockStart xtime.UnixNano) (fs.DataFileSetReader, error)
// TryRetrieveSeriesAndIncrementReaderWriterCount attempts to retrieve a writable series.
// This increments the reader/writer count and so should be decremented when the series
// is no longer held.
TryRetrieveSeriesAndIncrementReaderWriterCount(id ident.ID) (*Entry, WritableSeriesOptions, error)
}
type databaseShard interface {
Shard
// OnEvictedFromWiredList is the same as block.Owner. Had to duplicate
// it here because mockgen chokes on embedded interfaces sometimes:
// https://github.com/golang/mock/issues/10
OnEvictedFromWiredList(id ident.ID, blockStart xtime.UnixNano)
// Close will release the shard resources and close the shard.
Close() error
// Tick performs all async updates
Tick(c context.Cancellable, startTime xtime.UnixNano, nsCtx namespace.Context) (tickResult, error)
// Write writes a value to the shard for an ID.
Write(
ctx context.Context,
id ident.ID,
timestamp xtime.UnixNano,
value float64,
unit xtime.Unit,
annotation []byte,
wOpts series.WriteOptions,
) (SeriesWrite, error)
// WriteTagged writes a value to the shard for an ID with tags.
WriteTagged(
ctx context.Context,
id ident.ID,
tagResolver convert.TagMetadataResolver,
timestamp xtime.UnixNano,
value float64,
unit xtime.Unit,
annotation []byte,
wOpts series.WriteOptions,
) (SeriesWrite, error)
ReadEncoded(
ctx context.Context,
id ident.ID,
start, end xtime.UnixNano,
nsCtx namespace.Context,
) (series.BlockReaderIter, error)
// FetchWideEntry retrieves wide entry for an ID for the
// block at time start.
FetchWideEntry(
ctx context.Context,
id ident.ID,
blockStart xtime.UnixNano,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (block.StreamedWideEntry, error)
// FetchBlocks retrieves data blocks for a given id and a list of block
// start times.
FetchBlocks(
ctx context.Context,
id ident.ID,
starts []xtime.UnixNano,
nsCtx namespace.Context,
) ([]block.FetchBlockResult, error)
// FetchBlocksForColdFlush fetches blocks for a cold flush. This function
// informs the series and the buffer that a cold flush for the specified
// block start is occurring so that it knows to update bucket versions.
FetchBlocksForColdFlush(
ctx context.Context,
seriesID ident.ID,
start xtime.UnixNano,
version int,
nsCtx namespace.Context,
) (block.FetchBlockResult, error)
// FetchBlocksMetadataV2 retrieves blocks metadata.
FetchBlocksMetadataV2(
ctx context.Context,
start, end xtime.UnixNano,
limit int64,
pageToken PageToken,
opts block.FetchBlocksMetadataOptions,
) (block.FetchBlocksMetadataResults, PageToken, error)
// PrepareBootstrap prepares the shard for bootstrapping by ensuring
// it knows which flushed files reside on disk.
PrepareBootstrap(ctx context.Context) error
// Bootstrap bootstraps the shard after all provided data
// has been loaded using LoadBootstrapBlocks.
Bootstrap(ctx context.Context, nsCtx namespace.Context) error
// UpdateFlushStates updates all the flush states for the current shard
// by checking the file volumes that exist on disk at a point in time.
UpdateFlushStates()
// LoadBlocks does the same thing as LoadBootstrapBlocks,
// except it can be called more than once and after a shard is
// bootstrapped already.
LoadBlocks(series *result.Map) error
// WarmFlush flushes the WarmWrites in this shard.
WarmFlush(
blockStart xtime.UnixNano,
flush persist.FlushPreparer,
nsCtx namespace.Context,
) error
// MarkWarmIndexFlushStateSuccessOrError marks the blockStart as
// success or fail based on the provided err.
MarkWarmIndexFlushStateSuccessOrError(blockStart xtime.UnixNano, err error)
// ColdFlush flushes the unflushed ColdWrites in this shard.
ColdFlush(
flush persist.FlushPreparer,
resources coldFlushReusableResources,
nsCtx namespace.Context,
onFlush persist.OnFlushSeries,
) (ShardColdFlush, error)
// Snapshot snapshot's the unflushed WarmWrites in this shard.
Snapshot(
blockStart xtime.UnixNano,
snapshotStart xtime.UnixNano,
flush persist.SnapshotPreparer,
nsCtx namespace.Context,
) (ShardSnapshotResult, error)
// FlushState returns the flush state for this shard at block start.
FlushState(blockStart xtime.UnixNano) (fileOpState, error)
// CleanupExpiredFileSets removes expired fileset files.
CleanupExpiredFileSets(earliestToRetain xtime.UnixNano) error
// CleanupCompactedFileSets removes fileset files that have been compacted,
// meaning that there exists a more recent, superset, fully persisted
// fileset for that block.
CleanupCompactedFileSets() error
// Repair repairs the shard data for a given time.
Repair(
ctx context.Context,
nsCtx namespace.Context,
nsMeta namespace.Metadata,
tr xtime.Range,
repairer databaseShardRepairer,
) (repair.MetadataComparisonResult, error)
// SeriesRefResolver returns a series ref resolver, callers
// must make sure to call the release callback once finished
// with the reference.
SeriesRefResolver(
id ident.ID,
tags ident.TagIterator,
) (bootstrap.SeriesRefResolver, error)
// DocRef returns the doc if already present in a shard series.
DocRef(id ident.ID) (doc.Metadata, bool, error)
// AggregateTiles does large tile aggregation from source shards into this shard.
AggregateTiles(
ctx context.Context,
sourceNs Namespace,
targetNs Namespace,
shardID uint32,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error)
// LatestVolume returns the latest volume for the combination of shard+blockStart.
LatestVolume(blockStart xtime.UnixNano) (int, error)
}
// ShardSnapshotResult is a result from a shard snapshot.
type ShardSnapshotResult struct {
SeriesPersist int
}
// ShardColdFlush exposes a done method to finalize shard cold flush
// by persisting data and updating shard state/block leases.
type ShardColdFlush interface {
Done() error
}
// NamespaceIndex indexes namespace writes.
type NamespaceIndex interface {
// AssignShardSet sets the shard set assignment and returns immediately.
AssignShardSet(shardSet sharding.ShardSet)
// BlockStartForWriteTime returns the index block start
// time for the given writeTime.
BlockStartForWriteTime(
writeTime xtime.UnixNano,
) xtime.UnixNano
// BlockForBlockStart returns an index block for a block start.
BlockForBlockStart(
blockStart xtime.UnixNano,
) (index.Block, error)
// WriteBatch indexes the provided entries.
WriteBatch(
batch *index.WriteBatch,
) error
// WritePending indexes the provided pending entries.
WritePending(
pending []writes.PendingIndexInsert,
) error
// Query resolves the given query into known IDs.
Query(
ctx context.Context,
query index.Query,
opts index.QueryOptions,
) (index.QueryResult, error)
// WideQuery resolves the given query into known IDs.
WideQuery(
ctx context.Context,
query index.Query,
collector chan *ident.IDBatch,
opts index.WideQueryOptions,
) error
// AggregateQuery resolves the given query into aggregated tags.
AggregateQuery(
ctx context.Context,
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error)
// Bootstrap bootstraps the index with the provided segments.
Bootstrap(
bootstrapResults result.IndexResults,
) error
// Bootstrapped is true if the bootstrap has completed.
Bootstrapped() bool
// CleanupExpiredFileSets removes expired fileset files. Expiration is calcuated
// using the provided `t` as the frame of reference.
CleanupExpiredFileSets(t xtime.UnixNano) error
// CleanupCorruptedFileSets removes corrupted fileset files.
CleanupCorruptedFileSets() error
// CleanupDuplicateFileSets removes duplicate fileset files.
CleanupDuplicateFileSets(activeShards []uint32) error
// Tick performs internal house keeping in the index, including block rotation,
// data eviction, and so on.
Tick(c context.Cancellable, startTime xtime.UnixNano) (namespaceIndexTickResult, error)
// WarmFlush performs any warm flushes that the index has outstanding using
// the owned shards of the database.
WarmFlush(
flush persist.IndexFlush,
shards []databaseShard,
) error
// WarmFlushBlockStarts returns all index blockStarts which have been flushed to disk.
WarmFlushBlockStarts() []xtime.UnixNano
// ColdFlush performs any cold flushes that the index has outstanding using
// the owned shards of the database. Also returns a callback to be called when
// cold flushing completes to perform houskeeping.
ColdFlush(shards []databaseShard) (OnColdFlushDone, error)
// DebugMemorySegments allows for debugging memory segments.
DebugMemorySegments(opts DebugMemorySegmentsOptions) error
// BackgroundCompact background compacts eligible segments.
BackgroundCompact()
// Close will release the index resources and close the index.
Close() error
}
// OnColdFlushDone is a callback that performs house keeping once cold flushing completes.
type OnColdFlushDone func() error
// DebugMemorySegmentsOptions is a set of options to debug memory segments.
type DebugMemorySegmentsOptions struct {
OutputDirectory string
}
// namespaceIndexTickResult are details about the work performed by the namespaceIndex
// during a Tick().
type namespaceIndexTickResult struct {
NumBlocks int64
NumBlocksSealed int64
NumBlocksEvicted int64
NumSegments int64
NumSegmentsBootstrapped int64
NumSegmentsMutable int64
NumTotalDocs int64
FreeMmap int64
}
// namespaceIndexInsertQueue is a queue used in-front of the indexing component
// for Writes. NB: this is an interface to allow easier unit tests in namespaceIndex.
type namespaceIndexInsertQueue interface {
// Start starts accepting writes in the queue.
Start() error
// Stop stops accepting writes in the queue.
Stop() error
// InsertBatch inserts the provided documents to the index queue which processes
// inserts to the index asynchronously. It executes the provided callbacks
// based on the result of the execution. The returned wait group can be used
// if the insert is required to be synchronous.
InsertBatch(
batch *index.WriteBatch,
) (*sync.WaitGroup, error)
// InsertPending inserts the provided documents to the index queue which processes
// inserts to the index asynchronously. It executes the provided callbacks
// based on the result of the execution. The returned wait group can be used
// if the insert is required to be synchronous.
InsertPending(
pending []writes.PendingIndexInsert,
) (*sync.WaitGroup, error)
}
// databaseBootstrapManager manages the bootstrap process.
type databaseBootstrapManager interface {
// IsBootstrapped returns whether the database is already bootstrapped.
IsBootstrapped() bool
// LastBootstrapCompletionTime returns the last bootstrap completion time,
// if any.
LastBootstrapCompletionTime() (xtime.UnixNano, bool)
// Bootstrap performs bootstrapping for all namespaces and shards owned.
Bootstrap() (BootstrapResult, error)
// BootstrapEnqueue performs bootstrapping asynchronously for all namespaces and shards owned.
BootstrapEnqueue(opts BootstrapEnqueueOptions)
// Report reports runtime information.
Report()
}
// BootstrapCompleteFn is a callback for when bootstrap is complete when using
// BootstrapEnqueue method.
type BootstrapCompleteFn func(BootstrapResult)
// BootstrapEnqueueOptions is options to pass to BootstrapEnqueue when
// enqueuing a bootstrap.
type BootstrapEnqueueOptions struct {
// OnCompleteFn is an optional function to pass to execute once
// the set of queued bootstraps are complete.
OnCompleteFn BootstrapCompleteFn
}
// BootstrapResult is a bootstrap result.
type BootstrapResult struct {
ErrorsBootstrap []error
AlreadyBootstrapping bool
}
// databaseFlushManager manages flushing in-memory data to persistent storage.
type databaseFlushManager interface {
// Flush flushes in-memory data to persistent storage.
Flush(startTime xtime.UnixNano) error
// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
// Report reports runtime information.
Report()
}
// databaseCleanupManager manages cleaning up persistent storage space.
// NB(bodu): We have to separate flush methods since we separated out flushing into warm/cold flush
// and cleaning up certain types of data concurrently w/ either can be problematic.
type databaseCleanupManager interface {
// WarmFlushCleanup cleans up data not needed in the persistent storage before a warm flush.
WarmFlushCleanup(t xtime.UnixNano) error
// ColdFlushCleanup cleans up data not needed in the persistent storage before a cold flush.
ColdFlushCleanup(t xtime.UnixNano) error
// Report reports runtime information.
Report()
}
// databaseFileSystemManager manages the database related filesystem activities.
type databaseFileSystemManager interface {
// Flush flushes in-memory data to persistent storage.
Flush(t xtime.UnixNano) error
// Disable disables the filesystem manager and prevents it from
// performing file operations, returns the current file operation status.
Disable() fileOpStatus
// Enable enables the filesystem manager to perform file operations.
Enable() fileOpStatus
// Status returns the file operation status.
Status() fileOpStatus
// Run attempts to perform all filesystem-related operations,
// returning true if those operations are performed, and false otherwise.
Run(t xtime.UnixNano) bool
// Report reports runtime information.
Report()
// LastSuccessfulSnapshotStartTime returns the start time of the last
// successful snapshot, if any.
LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool)
}
// databaseColdFlushManager manages the database related cold flush activities.
type databaseColdFlushManager interface {
databaseCleanupManager
// Disable disables the cold flush manager and prevents it from
// performing file operations, returns the current file operation status.
Disable() fileOpStatus
// Enable enables the cold flush manager to perform file operations.
Enable() fileOpStatus
// Status returns the file operation status.
Status() fileOpStatus
// Run attempts to perform all cold flush related operations,
// returning true if those operations are performed, and false otherwise.
Run(t xtime.UnixNano) bool
}
// databaseShardRepairer repairs in-memory data for a shard.
type databaseShardRepairer interface {
// Options returns the repair options.
Options() repair.Options
// Repair repairs the data for a given namespace and shard.
Repair(
ctx context.Context,
nsCtx namespace.Context,
nsMeta namespace.Metadata,
tr xtime.Range,
shard databaseShard,
) (repair.MetadataComparisonResult, error)
}
// BackgroundProcess is a background process that is run by the database.
type BackgroundProcess interface {
// Start launches the BackgroundProcess to run asynchronously.
Start()
// Stop stops the BackgroundProcess.
Stop()
// Report reports runtime information.
Report()
}
// FileOpsProcess is a background process that is run by the database.
type FileOpsProcess interface {
// Start launches the FileOpsProcess to run asynchronously.
Start()
}
// FileOpsProcessFn is a file ops process function.
type FileOpsProcessFn func()
// Start starts file ops process function.
func (f FileOpsProcessFn) Start() {
// delegate to the anonymous function.
f()
}
// databaseRepairer repairs in-memory database data.
type databaseRepairer interface {
BackgroundProcess
// Repair repairs in-memory data.
Repair() error
}
// databaseTickManager performs periodic ticking.
type databaseTickManager interface {