@@ -5,12 +5,14 @@ import (
55 "encoding/binary"
66 "errors"
77 "fmt"
8+ "math"
89 "os"
910 "path/filepath"
1011 "sync"
1112
1213 "github.com/influxdata/influxdb/v2/logger"
1314 "github.com/influxdata/influxdb/v2/models"
15+ errors2 "github.com/influxdata/influxdb/v2/pkg/errors"
1416 "github.com/influxdata/influxdb/v2/pkg/limiter"
1517 "github.com/influxdata/influxdb/v2/pkg/rhh"
1618 "go.uber.org/zap"
@@ -87,7 +89,7 @@ func (p *SeriesPartition) Open() error {
8789 p .index = NewSeriesIndex (p .IndexPath ())
8890 if err := p .index .Open (); err != nil {
8991 return err
90- } else if err : = p .index .Recover (p .segments ); err != nil {
92+ } else if err = p .index .Recover (p .segments ); err != nil {
9193 return err
9294 }
9395
@@ -565,94 +567,108 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
565567 return nil
566568}
567569
568- func (c * SeriesPartitionCompactor ) compactIndexTo (index * SeriesIndex , seriesN uint64 , segments []* SeriesSegment , path string ) error {
569- hdr := NewSeriesIndexHeader ()
570- hdr .Count = seriesN
571- hdr .Capacity = pow2 ((int64 (hdr .Count ) * 100 ) / SeriesIndexLoadFactor )
572-
573- // Allocate space for maps.
574- keyIDMap := make ([]byte , (hdr .Capacity * SeriesIndexElemSize ))
575- idOffsetMap := make ([]byte , (hdr .Capacity * SeriesIndexElemSize ))
570+ var errDone error = errors .New ("done" )
576571
577- // Reindex all partitions.
578- var entryN int
579- for _ , segment := range segments {
580- errDone := errors .New ("done" )
572+ func (c * SeriesPartitionCompactor ) compactIndexTo (index * SeriesIndex , seriesN uint64 , segments []* SeriesSegment , path string ) (err error ) {
581573
582- if err := segment .ForEachEntry (func (flag uint8 , id uint64 , offset int64 , key []byte ) error {
574+ hdr := NewSeriesIndexHeader ()
575+ var keyIDMap []byte
576+ var idOffsetMap []byte
577+
578+ hdr .Count = math .MaxUint64
579+ // seriesN is the current size of the index. Because it may contain tombstones
580+ // for deleted series, we recalculate that number (as seriesCount) without the
581+ // deleted series as we rebuild the index. If the count of existing series does
582+ // not equal the seriesN passed in (meaning there were tombstones), we rebuild
583+ // the index a second time with the correct size.
584+ seriesCount := seriesN
585+ for {
586+ seriesN = seriesCount
587+ seriesCount = uint64 (0 )
588+ // This only loops if there are deleted entries, which shrinks the size
589+ hdr .Capacity = pow2 ((int64 (seriesN ) * 100 ) / SeriesIndexLoadFactor )
590+ // Allocate space for maps, guaranteeing slices are initialized to zero
591+ keyIDMap = make ([]byte , hdr .Capacity * SeriesIndexElemSize )
592+ idOffsetMap = make ([]byte , hdr .Capacity * SeriesIndexElemSize )
593+
594+ // Reindex all partitions.
595+ var entryN int
596+ for _ , segment := range segments {
597+
598+ if err = segment .ForEachEntry (func (flag uint8 , id uint64 , offset int64 , key []byte ) error {
599+
600+ // Make sure we don't go past the offset where the compaction began.
601+ if offset > index .maxOffset {
602+ return errDone
603+ }
583604
584- // Make sure we don't go past the offset where the compaction began.
585- if offset > index .maxOffset {
586- return errDone
587- }
605+ // Check for cancellation periodically.
606+ if entryN ++ ; entryN % 1000 == 0 {
607+ select {
608+ case <- c .cancel :
609+ return ErrSeriesPartitionCompactionCancelled
610+ default :
611+ }
612+ }
588613
589- // Check for cancellation periodically.
590- if entryN ++ ; entryN % 1000 == 0 {
591- select {
592- case <- c .cancel :
593- return ErrSeriesPartitionCompactionCancelled
614+ // Only process insert entries.
615+ switch flag {
616+ case SeriesEntryInsertFlag :
617+ // does not fallthrough
618+ case SeriesEntryTombstoneFlag :
619+ return nil
594620 default :
621+ return fmt .Errorf ("unexpected series partition log entry flag: %d" , flag )
595622 }
596- }
597-
598- // Only process insert entries.
599- switch flag {
600- case SeriesEntryInsertFlag : // fallthrough
601- case SeriesEntryTombstoneFlag :
602- return nil
603- default :
604- return fmt .Errorf ("unexpected series partition log entry flag: %d" , flag )
605- }
606623
607- // Save max series identifier processed.
608- hdr .MaxSeriesID , hdr .MaxOffset = id , offset
624+ // Save max series identifier processed.
625+ hdr .MaxSeriesID , hdr .MaxOffset = id , offset
609626
610- // Ignore entry if tombstoned.
611- if index .IsDeleted (id ) {
612- return nil
627+ // Ignore entry if tombstoned.
628+ if index .IsDeleted (id ) {
629+ return nil
630+ }
631+ seriesCount ++
632+ // Insert into maps.
633+ c .insertIDOffsetMap (idOffsetMap , hdr .Capacity , id , offset )
634+ return c .insertKeyIDMap (keyIDMap , hdr .Capacity , segments , key , offset , id )
635+ }); err == errDone {
636+ break
637+ } else if err != nil {
638+ return err
613639 }
614-
615- // Insert into maps.
616- c . insertIDOffsetMap ( idOffsetMap , hdr . Capacity , id , offset )
617- return c . insertKeyIDMap ( keyIDMap , hdr . Capacity , segments , key , offset , id )
618- }); err == errDone {
640+ }
641+ hdr . Count = seriesCount
642+ if seriesN != seriesCount {
643+ continue
644+ } else {
619645 break
620- } else if err != nil {
621- return err
622646 }
623647 }
624-
625648 // Open file handler.
626649 f , err := os .Create (path )
627650 if err != nil {
628651 return err
629652 }
630- defer f .Close ()
631-
653+ defer errors2 .Capture (& err , f .Close )()
632654 // Calculate map positions.
633655 hdr .KeyIDMap .Offset , hdr .KeyIDMap .Size = SeriesIndexHeaderSize , int64 (len (keyIDMap ))
634656 hdr .IDOffsetMap .Offset , hdr .IDOffsetMap .Size = hdr .KeyIDMap .Offset + hdr .KeyIDMap .Size , int64 (len (idOffsetMap ))
635657
636658 // Write header.
637- if _ , err : = hdr .WriteTo (f ); err != nil {
659+ if _ , err = hdr .WriteTo (f ); err != nil {
638660 return err
639661 }
640662
641663 // Write maps.
642- if _ , err : = f .Write (keyIDMap ); err != nil {
664+ if _ , err = f .Write (keyIDMap ); err != nil {
643665 return err
644666 } else if _ , err := f .Write (idOffsetMap ); err != nil {
645667 return err
646668 }
647669
648- // Sync & close.
649- if err := f .Sync (); err != nil {
650- return err
651- } else if err := f .Close (); err != nil {
652- return err
653- }
654-
655- return nil
670+ // Sync, then deferred close
671+ return f .Sync ()
656672}
657673
658674func (c * SeriesPartitionCompactor ) insertKeyIDMap (dst []byte , capacity int64 , segments []* SeriesSegment , key []byte , offset int64 , id uint64 ) error {
0 commit comments