forked from influxdata/telegraf
/
wal.go
1612 lines (1341 loc) · 45.1 KB
/
wal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Package WAL implements a write ahead log optimized for write throughput
that can be put in front of the database index.
The WAL is broken into different partitions. The default number of
partitions is 5. Each partition consists of a number of segment files.
By default these files will get up to 2MB in size before a new segment
file is opened. The files are numbered and start at 1. The number
indicates the order in which the files should be read on startup to
ensure data is recovered in the same order it was written.
Partitions are flushed and compacted individually. One of the goals with
having multiple partitions was to be able to flush only a portion of the
WAL at a time.
The WAL does not flush everything in a partition when it comes time. It will
only flush series that are over a given threshold (32kb by default). The rest
will be written into a new segment file so they can be flushed later. This
is like a compaction in an LSM Tree.
*/
package wal
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/golang/snappy"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
)
const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024
// FileExtension is the file extension we expect for wal segments
FileExtension = "wal"
// MetaFileExtension is the file extension for the log files of new fields and measurements that get created
MetaFileExtension = "meta"
// CompactionExtension is the file extension we expect for compaction files
CompactionExtension = "CPT"
// MetaFlushInterval is the period after which any compressed meta data in the .meta file will get
// flushed to the index
MetaFlushInterval = 10 * time.Minute
// FailWriteMemoryThreshold will start returning errors on writes if the memory gets more
// than this multiple above the maximum threshold. This is set to 5 because previously
// the memory threshold was for 5 partitions, but when this was introduced the partition
// count was reduced to 1 so we know that it can handle at least this much extra memory
FailWriteMemoryThreshold = 5
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
defaultFlushCheckInterval = time.Second
)
// Statistics maintained by the WAL
const (
statPointsWriteReq = "points_write_req"
statPointsWrite = "points_write"
statFlush = "flush"
statAutoFlush = "auto_flush"
statIdleFlush = "idle_flush"
statMetadataFlush = "meta_flush"
statThresholdFlush = "threshold_flush"
statMemoryFlush = "mem_flush"
statSeriesFlushed = "series_flush"
statPointsFlushed = "points_flush"
statFlushDuration = "flush_duration"
statWriteFail = "write_fail"
statMemorySize = "mem_size"
)
// flushType indiciates why a flush and compaction are being run so the partition can
// do the appropriate type of compaction
type flushType int
const (
// noFlush indicates that no flush or compaction are necesssary at this time
noFlush flushType = iota
// memoryFlush indicates that we should look for the series using the most
// memory to flush out and compact all others
memoryFlush
// idleFlush indicates that we should flush all series in the parition,
// delete all segment files and hold off on opening a new one
idleFlush
// thresholdFlush indicates that we should flush all series over the ReadySize
// and compact all other series
thresholdFlush
// deleteFlush indicates that we're flushing because series need to be removed from the WAL
deleteFlush
)
var (
// ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one
ErrCompactionRunning = errors.New("compaction running")
// ErrMemoryCompactionDone gets returned if we called to flushAndCompact to free up memory
// but a compaction has already been done to do so
ErrMemoryCompactionDone = errors.New("compaction already run to free up memory")
// CompactSequence is the byte sequence within a segment file that has been compacted
// that indicates the start of a compaction marker
CompactSequence = []byte{0xFF, 0xFF}
)
type Log struct {
path string
flush chan int // signals a background flush on the given partition
flushLock sync.Mutex // serializes access to flushing to index
flushCheckTimer *time.Timer // check this often to see if a background flush should happen
flushCheckInterval time.Duration
// These coordinate closing and waiting for running goroutines.
wg sync.WaitGroup
closing chan struct{}
// LogOutput is the writer used by the logger.
LogOutput io.Writer
logger *log.Logger
mu sync.RWMutex
partition *Partition
// metaFile is the file that compressed metadata like series and fields are written to
metaFile *os.File
// FlushColdInterval is the period of time after which a partition will do a
// full flush and compaction if it has been cold for writes.
FlushColdInterval time.Duration
// SegmentSize is the file size at which a segment file will be rotated in a partition.
SegmentSize int64
// MaxSeriesSize controls when a partition should get flushed to index and compacted
// if any series in the partition has exceeded this size threshold
MaxSeriesSize int
// ReadySeriesSize is the minimum size a series of points must get to before getting flushed.
ReadySeriesSize int
// CompactionThreshold controls when a parition will be flushed. Once this
// percentage of series in a partition are ready, a flush and compaction will be triggered.
CompactionThreshold float64
// PartitionSizeThreshold specifies when a partition should be forced to be flushed.
PartitionSizeThreshold uint64
// Index is the database that series data gets flushed to once it gets compacted
// out of the WAL.
Index IndexWriter
// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool
// expvar-based statistics
statMap *expvar.Map
}
// IndexWriter is an interface for the indexed database the WAL flushes data to
type IndexWriter interface {
// time ascending points where each byte array is:
// int64 time
// data
WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func NewLog(path string) *Log {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"wal", path}, ":")
tags := map[string]string{"path": path}
return &Log{
path: path,
flush: make(chan int, 1),
// these options should be overriden by any options in the config
LogOutput: os.Stderr,
FlushColdInterval: tsdb.DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
MaxSeriesSize: tsdb.DefaultMaxSeriesSize,
CompactionThreshold: tsdb.DefaultCompactionThreshold,
PartitionSizeThreshold: tsdb.DefaultPartitionSizeThreshold,
ReadySeriesSize: tsdb.DefaultReadySeriesSize,
flushCheckInterval: defaultFlushCheckInterval,
logger: log.New(os.Stderr, "[wal] ", log.LstdFlags),
statMap: influxdb.NewStatistics(key, "wal", tags),
}
}
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (l *Log) Open() error {
if l.LoggingEnabled {
l.logger.Printf("WAL starting with %d ready series size, %0.2f compaction threshold, and %d partition size threshold\n", l.ReadySeriesSize, l.CompactionThreshold, l.PartitionSizeThreshold)
l.logger.Printf("WAL writing to %s\n", l.path)
}
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
}
// open the metafile for writing
if err := l.nextMetaFile(); err != nil {
return err
}
// open the partition
p, err := NewPartition(uint8(1), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index, l.statMap)
if err != nil {
return err
}
p.log = l
l.partition = p
if err := l.openPartitionFile(); err != nil {
return err
}
l.flushCheckTimer = time.NewTimer(l.flushCheckInterval)
// Start background goroutines.
l.wg.Add(1)
l.closing = make(chan struct{})
go l.autoflusher(l.closing)
return nil
}
func (l *Log) DiskSize() (int64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
stat, err := os.Stat(l.path)
if err != nil {
return 0, err
}
return stat.Size(), nil
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
l.mu.RLock()
defer l.mu.RUnlock()
return l.partition.cursor(series, fields, dec, ascending)
}
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
l.statMap.Add(statPointsWriteReq, 1)
l.statMap.Add(statPointsWrite, int64(len(points)))
// persist the series and fields if there are any
if err := l.writeSeriesAndFields(fields, series); err != nil {
l.logger.Println("error writing series and fields:", err.Error())
return err
}
// persist the raw point data
return l.partition.Write(points)
}
// Flush will force a flush on all paritions
func (l *Log) Flush() error {
l.statMap.Add(statFlush, 1)
l.mu.RLock()
defer l.mu.RUnlock()
return l.partition.flushAndCompact(idleFlush)
}
// LoadMetadatIndex loads the new series and fields files into memory and flushes them to the BoltDB index. This function
// should be called before making a call to Open()
func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
metaFiles, err := l.metadataFiles()
if err != nil {
return err
}
measurementFieldsToSave := make(map[string]*tsdb.MeasurementFields)
seriesToCreate := make([]*tsdb.SeriesCreate, 0)
// read all the metafiles off disk
for _, fn := range metaFiles {
a, err := l.readMetadataFile(fn)
if err != nil {
return err
}
// loop through the seriesAndFields and add them to the index and the collection to be written to the index
for _, sf := range a {
for k, mf := range sf.Fields {
measurementFieldsToSave[k] = mf
m := index.CreateMeasurementIndexIfNotExists(string(k))
for name, _ := range mf.Fields {
m.SetFieldName(name)
}
mf.Codec = tsdb.NewFieldCodec(mf.Fields)
measurementFields[m.Name] = mf
}
for _, sc := range sf.Series {
seriesToCreate = append(seriesToCreate, sc)
sc.Series.InitializeShards()
index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(sc.Series.Key)), sc.Series)
}
}
}
if err := l.Index.WriteIndex(nil, measurementFieldsToSave, seriesToCreate); err != nil {
return err
}
// now remove all the old metafiles
for _, fn := range metaFiles {
if err := os.Remove(fn); err != nil {
return err
}
}
return nil
}
// DeleteSeries will flush the metadata that is in the WAL to the index and remove
// all series specified from the cache and the segment files in each partition. This
// will block all writes while a compaction is done against all partitions. This function
// is meant to be called by bz1 BEFORE it updates its own index, since the metadata
// is flushed here first.
func (l *Log) DeleteSeries(keys []string) error {
if err := l.flushMetadata(); err != nil {
return err
}
// we want to stop any writes from happening to ensure the data gets cleared
l.mu.Lock()
defer l.mu.Unlock()
return l.partition.deleteSeries(keys)
}
// readMetadataFile will read the entire contents of the meta file and return a slice of the
// seriesAndFields objects that were written in. It ignores file errors since those can't be
// recovered.
func (l *Log) readMetadataFile(fileName string) ([]*seriesAndFields, error) {
f, err := os.OpenFile(fileName, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
a := make([]*seriesAndFields, 0)
length := make([]byte, 8)
for {
// get the length of the compressed seriesAndFields blob
_, err := io.ReadFull(f, length)
if err == io.EOF {
break
} else if err != nil {
f.Close()
return nil, err
}
dataLength := btou64(length)
if dataLength == 0 {
break
}
// read in the compressed block and decod it
b := make([]byte, dataLength)
_, err = io.ReadFull(f, b)
if err == io.EOF {
break
} else if err != nil {
// print the error and move on since we can't recover the file
l.logger.Println("error reading length of metadata:", err.Error())
break
}
buf, err := snappy.Decode(nil, b)
if err != nil {
// print the error and move on since we can't recover the file
l.logger.Println("error reading compressed metadata info:", err.Error())
break
}
sf := &seriesAndFields{}
if err := json.Unmarshal(buf, sf); err != nil {
// print the error and move on since we can't recover the file
l.logger.Println("error unmarshaling json for new series and fields:", err.Error())
break
}
a = append(a, sf)
}
if err := f.Close(); err != nil {
return nil, err
}
return a, nil
}
// writeSeriesAndFields will write the compressed fields and series to the meta file. This file persists the data
// in case the server gets shutdown before the WAL has a chance to flush everything to the cache. By default this
// file is flushed on start when bz1 calls LoadMetaDataIndex
func (l *Log) writeSeriesAndFields(fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
if len(fields) == 0 && len(series) == 0 {
return nil
}
sf := &seriesAndFields{Fields: fields, Series: series}
b, err := json.Marshal(sf)
if err != nil {
return err
}
cb := snappy.Encode(nil, b)
l.mu.Lock()
defer l.mu.Unlock()
if _, err := l.metaFile.Write(u64tob(uint64(len(cb)))); err != nil {
return err
}
if _, err := l.metaFile.Write(cb); err != nil {
return err
}
return l.metaFile.Sync()
}
// nextMetaFile will close the current file if there is one open and open a new file to log
// metadata updates to. This function assumes that you've locked l.mu elsewhere.
func (l *Log) nextMetaFile() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.metaFile != nil {
if err := l.metaFile.Close(); err != nil {
return err
}
}
metaFiles, err := l.metadataFiles()
if err != nil {
return err
}
id := 0
if len(metaFiles) > 0 {
num := strings.Split(filepath.Base(metaFiles[len(metaFiles)-1]), ".")[0]
n, err := strconv.ParseInt(num, 10, 32)
if err != nil {
return err
}
id = int(n) + 1
}
nextFileName := filepath.Join(l.path, fmt.Sprintf("%06d.%s", id, MetaFileExtension))
l.metaFile, err = os.OpenFile(nextFileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
return nil
}
// metadataFiles returns the files in the WAL directory with the MetaFileExtension
func (l *Log) metadataFiles() ([]string, error) {
path := filepath.Join(l.path, fmt.Sprintf("*.%s", MetaFileExtension))
a, err := filepath.Glob(path)
if err != nil {
return nil, err
}
sort.Strings(a)
return a, nil
}
// openPartitionFiles will open the partition and flush all segment files to the index
func (l *Log) openPartitionFile() error {
// Recover from a partial compaction.
if err := l.partition.recoverCompactionFile(); err != nil {
return fmt.Errorf("recover compaction files: %s", err)
}
fileNames, err := l.partition.segmentFileNames()
if err != nil {
return err
}
if l.LoggingEnabled && len(fileNames) > 0 {
l.logger.Println("reading WAL files to flush to index")
}
for _, n := range fileNames {
entries, err := l.partition.readFile(n)
if err != nil {
return err
}
seriesToFlush := make(map[string][][]byte)
for _, e := range entries {
seriesToFlush[string(e.key)] = append(seriesToFlush[string(e.key)], MarshalEntry(e.timestamp, e.data))
}
if l.LoggingEnabled {
l.logger.Printf("writing %d series from WAL file %s to index\n", len(seriesToFlush), n)
}
if err := l.Index.WriteIndex(seriesToFlush, nil, nil); err != nil {
return err
}
if err := os.Remove(n); err != nil {
return err
}
}
return nil
}
// Close will finish any flush that is currently in process and close file handles
func (l *Log) Close() error {
// stop the autoflushing process so it doesn't try to kick another one off
l.mu.Lock()
if l.closing != nil {
close(l.closing)
l.closing = nil
}
l.mu.Unlock()
// Allow goroutines to finish running.
l.wg.Wait()
// Lock the remainder of the closing process.
l.mu.Lock()
defer l.mu.Unlock()
// close partition and metafile
if err := l.close(); err != nil {
return err
}
return nil
}
// close all the open Log partitions and file handles
func (l *Log) close() error {
if err := l.partition.Close(); err != nil {
// log and skip so we can close the other partitions
l.logger.Println("error closing partition:", err)
}
if err := l.metaFile.Close(); err != nil {
return err
}
l.metaFile = nil
return nil
}
// triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction
func (l *Log) triggerAutoFlush() {
l.statMap.Add(statAutoFlush, 1)
l.mu.RLock()
defer l.mu.RUnlock()
if f := l.partition.shouldFlush(); f != noFlush {
if err := l.partition.flushAndCompact(f); err != nil {
l.logger.Printf("error flushing partition: %s\n", err)
}
}
}
// autoflusher waits for notification of a flush and kicks it off in the background.
// This method runs in a separate goroutine.
func (l *Log) autoflusher(closing chan struct{}) {
defer l.wg.Done()
metaFlushTicker := time.NewTicker(MetaFlushInterval)
for {
// Wait for close or flush signal.
select {
case <-closing:
metaFlushTicker.Stop()
return
case <-l.flushCheckTimer.C:
l.triggerAutoFlush()
l.flushCheckTimer.Reset(l.flushCheckInterval)
case <-l.flush:
if err := l.Flush(); err != nil {
l.logger.Println("flush error:", err)
}
case <-metaFlushTicker.C:
if err := l.flushMetadata(); err != nil {
l.logger.Println("metadata flush error:", err)
}
}
}
}
// flushMetadata will write start a new metafile for writes to go through and then flush all
// metadata from previous files to the index. After a sucessful write, the metadata files
// will be removed. While the flush to index is happening we aren't blocked for new metadata writes.
func (l *Log) flushMetadata() error {
l.statMap.Add(statMetadataFlush, 1)
// make sure there's actually something in the metadata file to flush
size, err := func() (int64, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.metaFile == nil {
return 0, nil
}
st, err := l.metaFile.Stat()
if err != nil {
return 0, err
}
return st.Size(), nil
}()
if err != nil {
return err
} else if size == 0 {
return nil
}
// we have data, get a list of the existing files and rotate to a new one, then flush
files, err := l.metadataFiles()
if err != nil {
return err
}
if err := l.nextMetaFile(); err != nil {
return err
}
measurements := make(map[string]*tsdb.MeasurementFields)
series := make([]*tsdb.SeriesCreate, 0)
// read all the measurement fields and series from the metafiles
for _, fn := range files {
a, err := l.readMetadataFile(fn)
if err != nil {
return err
}
for _, sf := range a {
for k, mf := range sf.Fields {
measurements[k] = mf
}
series = append(series, sf.Series...)
}
}
// Lock before flushing to ensure timing is not spent waiting for lock.
l.flushLock.Lock()
defer l.flushLock.Unlock()
startTime := time.Now()
if l.LoggingEnabled {
l.logger.Printf("Flushing %d measurements and %d series to index\n", len(measurements), len(series))
}
// write them to the index
if err := l.Index.WriteIndex(nil, measurements, series); err != nil {
return err
}
if l.LoggingEnabled {
l.logger.Println("Metadata flush took", time.Since(startTime))
}
// remove the old files now that we've persisted them elsewhere
for _, fn := range files {
if err := os.Remove(fn); err != nil {
return err
}
}
return nil
}
// Partition is a set of files for a partition of the WAL. We use multiple partitions so when compactions occur
// only a portion of the WAL must be flushed and compacted
type Partition struct {
id uint8
path string
mu sync.RWMutex
currentSegmentFile *os.File
currentSegmentSize int64
currentSegmentID uint32
lastFileID uint32
maxSegmentSize int64
cache map[string]*cacheEntry
index IndexWriter
readySeriesSize int
// memorySize is the rough size in memory of all the cached series data
memorySize uint64
// sizeThreshold is the memory size after which writes start getting throttled
sizeThreshold uint64
// flushCache is a temporary placeholder to keep data while its being flushed
// and compacted. It's for cursors to combine the cache and this if a flush is occuring
flushCache map[string][][]byte
compactionRunning bool
// flushColdInterval and lastWriteTime are used to determin if a partition should
// be flushed because it has been idle for writes.
flushColdInterval time.Duration
lastWriteTime time.Time
log *Log
statMap *expvar.Map
// Used for mocking OS calls
os struct {
OpenCompactionFile func(name string, flag int, perm os.FileMode) (file *os.File, err error)
OpenSegmentFile func(name string, flag int, perm os.FileMode) (file *os.File, err error)
Rename func(oldpath, newpath string) error
}
// buffers for reading and writing compressed blocks
// We constrain blocks so that we can read and write into a partition
// without allocating
buf []byte
snappybuf []byte
}
const partitionBufLen = 16 << 10 // 16kb
func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int,
flushColdInterval time.Duration, index IndexWriter, statMap *expvar.Map) (*Partition, error) {
p := &Partition{
id: id,
path: path,
maxSegmentSize: segmentSize,
sizeThreshold: sizeThreshold,
lastWriteTime: time.Now(),
cache: make(map[string]*cacheEntry),
readySeriesSize: readySeriesSize,
index: index,
flushColdInterval: flushColdInterval,
statMap: statMap,
}
p.os.OpenCompactionFile = os.OpenFile
p.os.OpenSegmentFile = os.OpenFile
p.os.Rename = os.Rename
p.buf = make([]byte, partitionBufLen)
p.snappybuf = make([]byte, snappy.MaxEncodedLen(partitionBufLen))
return p, nil
}
// Close resets the caches and closes the currently open segment file
func (p *Partition) Close() error {
if p == nil {
return nil
}
p.mu.Lock()
defer p.mu.Unlock()
p.cache = nil
if p.currentSegmentFile == nil {
return nil
}
if err := p.currentSegmentFile.Close(); err != nil {
return err
}
p.currentSegmentFile = nil
return nil
}
// Write will write a compressed block of the points to the current segment file. If the segment
// file is larger than the max size, it will roll over to a new file before performing the write.
// This method will also add the points to the in memory cache
func (p *Partition) Write(points []models.Point) error {
// Check if we should compact due to memory pressure and if we should fail the write if
// we're way too far over the threshold.
if shouldFailWrite, shouldCompact := func() (shouldFailWrite bool, shouldCompact bool) {
p.mu.RLock()
defer p.mu.RUnlock()
// Return an error if memory threshold has been reached.
if p.memorySize > p.sizeThreshold {
if !p.compactionRunning {
shouldCompact = true
} else if p.memorySize > p.sizeThreshold*FailWriteMemoryThreshold {
shouldFailWrite = true
}
}
return
}(); shouldCompact {
go p.flushAndCompact(memoryFlush)
} else if shouldFailWrite {
p.statMap.Add(statWriteFail, 1)
return fmt.Errorf("write throughput too high. backoff and retry")
}
p.mu.Lock()
defer p.mu.Unlock()
remainingPoints := points
for len(remainingPoints) > 0 {
block := bytes.NewBuffer(p.buf[:0])
var i int
for i = 0; i < len(remainingPoints); i++ {
pp := remainingPoints[i]
n := walEntryLength(pp)
// we might have a single point which is larger than the buffer
// If this is the case, then marshal it anyway and fall back to
// slice allocation. The appends below should handle it.
if block.Len()+n > partitionBufLen && i > 0 {
break
}
marshalWALEntry(block, pp.Key(), pp.UnixNano(), pp.Data())
}
marshaledPoints := remainingPoints[:i]
remainingPoints = remainingPoints[i:]
b := snappy.Encode(p.snappybuf[:], block.Bytes())
// rotate to a new file if we've gone over our limit
if p.currentSegmentFile == nil || p.currentSegmentSize > p.maxSegmentSize {
err := p.newSegmentFile()
if err != nil {
return err
}
}
if n, err := p.currentSegmentFile.Write(u64tob(uint64(len(b)))); err != nil {
return err
} else if n != 8 {
return fmt.Errorf("expected to write %d bytes but wrote %d", 8, n)
}
if n, err := p.currentSegmentFile.Write(b); err != nil {
return err
} else if n != len(b) {
return fmt.Errorf("expected to write %d bytes but wrote %d", len(b), n)
}
p.currentSegmentSize += int64(8 + len(b))
p.lastWriteTime = time.Now()
for _, pp := range marshaledPoints {
p.addToCache(pp.Key(), pp.Data(), pp.UnixNano())
}
}
return p.currentSegmentFile.Sync()
}
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the partition
func (p *Partition) newSegmentFile() error {
p.currentSegmentID += 1
if p.currentSegmentFile != nil {
if err := p.currentSegmentFile.Close(); err != nil {
return err
}
}
fileName := p.fileNameForSegment(p.currentSegmentID)
ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
p.currentSegmentSize = 0
p.currentSegmentFile = ff
return nil
}
// fileNameForSegment will return the full path and filename for a given segment ID
func (p *Partition) fileNameForSegment(id uint32) string {
return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.%s", p.id, id, FileExtension))
}
// compactionFileName is the name of the temporary file used for compaction
func (p *Partition) compactionFileName() string {
return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.%s", p.id, 1, CompactionExtension))
}
// fileIDFromName will return the segment ID from the file name
func (p *Partition) fileIDFromName(name string) (uint32, error) {
parts := strings.Split(filepath.Base(name), ".")
if len(parts) != 3 {
return 0, fmt.Errorf("file name doesn't follow wal format: %s", name)
}
id, err := strconv.ParseUint(parts[1], 10, 32)
if err != nil {
return 0, err
}
return uint32(id), nil
}
// shouldFlush returns a flushType that indicates if a partition should be flushed and why. If the
// partition hasn't received a write in a configurable amount of time it will flush or if the
// size of the in memory cache is too large it will flush.
func (p *Partition) shouldFlush() flushType {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.cache) == 0 {
return noFlush
}
if p.memorySize > p.sizeThreshold {
return memoryFlush
}
if time.Since(p.lastWriteTime) > p.flushColdInterval {
return idleFlush
}
return noFlush
}
// prepareSeriesToFlush will empty the cache of series and return compaction information
func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (*compactionInfo, error) {
p.mu.Lock()
defer p.mu.Unlock()
// if there is either a compaction running or one just ran and relieved
// memory pressure, just return from here
if p.compactionRunning {
return nil, ErrCompactionRunning
} else if flush == memoryFlush && p.memorySize < p.sizeThreshold {
return nil, ErrMemoryCompactionDone
}
p.compactionRunning = true
var size int
for _, c := range p.cache {
size += c.size
}
seriesToFlush := make(map[string][][]byte)
for k, c := range p.cache {
seriesToFlush[k] = c.points
}
p.cache = make(map[string]*cacheEntry)
c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size}
if flush == idleFlush {
// don't create a new segment file because this partition is idle
if p.currentSegmentFile != nil {
if err := p.currentSegmentFile.Close(); err != nil {
return nil, err
}
}
p.currentSegmentFile = nil
p.currentSegmentID += 1
p.currentSegmentSize = 0
} else {
// roll over a new segment file so we can compact all the old ones
if err := p.newSegmentFile(); err != nil {
return nil, err
}
}
p.flushCache = c.seriesToFlush
c.compactFilesLessThan = p.currentSegmentID
c.countCompacting = len(p.cache)
return c, nil
}
// flushAndCompact will flush any series that are over their threshold and then read in all old segment files and
// write the data that was not flushed to a new file
func (p *Partition) flushAndCompact(flush flushType) error {
c, err := p.prepareSeriesToFlush(p.readySeriesSize, flush)