Skip to content

Commit

Permalink
Merge pull request #9918 from influxdata/bj-tsm-open-limiter
Browse files Browse the repository at this point in the history
TSM1 Open Limiter
  • Loading branch information
benbjohnson authored and jacobmarble committed Jun 26, 2018
1 parent 30cb7aa commit 507c094
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ type EngineOptions struct {
ShardID uint64
InmemIndex interface{} // shared in-memory index

// Limits the concurrent number of TSM files that can be loaded at once.
OpenLimiter limiter.Fixed

CompactionPlannerCreator CompactionPlannerCreator
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
}

fs := NewFileStore(path)
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
Expand Down
9 changes: 9 additions & 0 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type FileStore struct {

files []TSMFile

openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.

logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
Expand Down Expand Up @@ -217,6 +219,7 @@ func NewFileStore(dir string) *FileStore {
lastModified: time.Time{},
logger: logger,
traceLogger: logger,
openLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
stats: &FileStoreStatistics{},
purger: &purger{
files: map[string]TSMFile{},
Expand Down Expand Up @@ -500,6 +503,12 @@ func (f *FileStore) Open() error {
}

go func(idx int, file *os.File) {
// Ensure a limited number of TSM files are loaded at once.
// Systems which have very large datasets (1TB+) can have thousands
// of TSM files which can cause extremely long load times.
f.openLimiter.Take()
defer f.openLimiter.Release()

start := time.Now()
df, err := NewTSMReader(file)
f.logger.Info("Opened file",
Expand Down
3 changes: 3 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func (s *Store) loadShards() error {
err error
}

// Limit the number of concurrent TSM files to be opened to the number of cores.
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))

// Setup a shared limiter for compactions
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {
Expand Down

0 comments on commit 507c094

Please sign in to comment.