Skip to content

Commit

Permalink
Filter out series within shards that do not have data for that series
Browse files Browse the repository at this point in the history
Previously, we would return a full tag set for every shard and the tag
set would include all series that existed in the database index
including series that didn't physically exist within that shard. This
led to the tag sets returned being incredibly huge when we had high
cardinality but sparse data. Since the data was sparse, it was
unexpected that it would cause such a large strain on the system by most
people.

Now we filter out the series ids that are not assigned to the current
shard when computing a tag set for that shard. This lowers the memory
usage for high cardinality sparse data drastically and allows queries on
those to complete successfully.

This does not resolve issues for high cardinality data in every shard
that is also spread out over a long series of time. That situation isn't
nearly as common as the above situation though.
  • Loading branch information
jsternberg committed Oct 20, 2016
1 parent 6fd74a6 commit 3681bc8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent.
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.
- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors.
- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series.

### Bugfixes

Expand Down
9 changes: 5 additions & 4 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
)

// NewEngineFunc creates a new engine.
type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine
type NewEngineFunc func(id uint64, path string, walPath string, options EngineOptions) Engine

// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
Expand All @@ -87,10 +87,10 @@ func RegisteredEngines() []string {

// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewEngine(path string, walPath string, options EngineOptions) (Engine, error) {
func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[options.EngineVersion](path, walPath, options), nil
return newEngineFuncs[options.EngineVersion](id, path, walPath, options), nil
}

// If it's a dir then it's a tsm1 engine
Expand All @@ -109,12 +109,13 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
return nil, fmt.Errorf("invalid engine format: %q", format)
}

return fn(path, walPath, options), nil
return fn(id, path, walPath, options), nil
}

// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
ShardID uint64

Config Config
}
Expand Down
6 changes: 4 additions & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Engine struct {
snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG sync.WaitGroup // waitgroup for running snapshot compactions

id uint64
path string
logger *log.Logger // Logger to be used for important messages
traceLogger *log.Logger // Logger to be used when trace-logging is on.
Expand Down Expand Up @@ -125,7 +126,7 @@ type Engine struct {
}

// NewEngine returns a new instance of Engine.
func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath)
fs := NewFileStore(path)
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)
Expand All @@ -136,6 +137,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
}

e := &Engine{
id: id,
path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
traceLogger: log.New(ioutil.Discard, "[tsm1] ", log.LstdFlags),
Expand Down Expand Up @@ -1268,7 +1270,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo

for _, mm := range mms {
// Determine tagsets for this measurement based on dimensions and filters.
tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition)
tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition)
if err != nil {
return err
}
Expand Down
35 changes: 24 additions & 11 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestEngine_Backup(t *testing.T) {
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")

// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
Expand Down Expand Up @@ -222,7 +222,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -275,7 +277,9 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -328,7 +332,9 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -382,7 +388,9 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -437,7 +445,9 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
Expand Down Expand Up @@ -497,7 +507,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
Expand Down Expand Up @@ -560,7 +572,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")

// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
Expand Down Expand Up @@ -721,7 +733,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
// Initialize metadata.
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))
Expand Down Expand Up @@ -770,7 +783,7 @@ func NewEngine() *Engine {
panic(err)
}
return &Engine{
Engine: tsm1.NewEngine(
Engine: tsm1.NewEngine(1,
filepath.Join(root, "data"),
filepath.Join(root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine),
Expand Down Expand Up @@ -802,7 +815,7 @@ func (e *Engine) Reopen() error {
return err
}

e.Engine = tsm1.NewEngine(
e.Engine = tsm1.NewEngine(1,
filepath.Join(e.root, "data"),
filepath.Join(e.root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine)
Expand Down
5 changes: 4 additions & 1 deletion tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
m.mu.RLock()

// get the unique set of series ids and the filters that should be applied to each
Expand All @@ -787,6 +787,9 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
tagSets := make(map[string]*influxql.TagSet, 64)
for _, id := range ids {
s := m.seriesByID[id]
if !s.Assigned(shardID) {
continue
}
tags := make(map[string]string, len(dimensions))

// Build the TagSet for this series.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (s *Shard) Open() error {
}

// Initialize underlying engine.
e, err := NewEngine(s.path, s.walPath, s.options)
e, err := NewEngine(s.id, s.path, s.walPath, s.options)
if err != nil {
return err
}
Expand Down

0 comments on commit 3681bc8

Please sign in to comment.