diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index a51150e0b67..4dc4d8d7257 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -196,6 +196,8 @@ func (s *Service) Run(database, name string, t time.Time) error { // backgroundLoop runs on a go routine and periodically executes CQs. func (s *Service) backgroundLoop() { leaseName := "continuous_querier" + t := time.NewTimer(s.RunInterval) + defer t.Stop() defer s.wg.Done() for { select { @@ -210,13 +212,15 @@ func (s *Service) backgroundLoop() { s.Logger.Printf("running continuous queries by request for time: %v", req.Now) s.runContinuousQueries(req) } - case <-time.After(s.RunInterval): + case <-t.C: if !s.hasContinuousQueries() { + t.Reset(s.RunInterval) continue } if _, err := s.MetaClient.AcquireLease(leaseName); err == nil { s.runContinuousQueries(&RunRequest{Now: time.Now()}) } + t.Reset(s.RunInterval) } } } diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 937bc1121b0..2dd6b358f5e 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -683,13 +683,13 @@ func (c *Cache) updateMemSize(b int64) { func valueType(v Value) int { switch v.(type) { - case *FloatValue: + case FloatValue: return 1 - case *IntegerValue: + case IntegerValue: return 2 - case *StringValue: + case StringValue: return 3 - case *BooleanValue: + case BooleanValue: return 4 default: return 0 diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 94cab317a78..bf1ee424101 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -73,6 +73,13 @@ type DefaultPlanner struct { // lastPlanCheck is the last time Plan was called lastPlanCheck time.Time + + mu sync.RWMutex + // lastFindGenerations is the last time findGenerations was run + lastFindGenerations time.Time + + // lastGenerations is the last set of generations found by findGenerations + lastGenerations tsmGenerations } // tsmGeneration represents the TSM files within a generation. @@ -458,6 +465,16 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // findGenerations groups all the TSM files by they generation based // on their filename then returns the generations in descending order (newest first) func (c *DefaultPlanner) findGenerations() tsmGenerations { + c.mu.RLock() + last := c.lastFindGenerations + lastGen := c.lastGenerations + c.mu.RUnlock() + + if !last.IsZero() && c.FileStore.LastModified().Equal(last) { + return lastGen + } + + genTime := c.FileStore.LastModified() tsmStats := c.FileStore.Stats() generations := make(map[int]*tsmGeneration, len(tsmStats)) for _, f := range tsmStats { @@ -477,7 +494,15 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { for _, g := range generations { orderedGenerations = append(orderedGenerations, g) } - sort.Sort(orderedGenerations) + if !orderedGenerations.IsSorted() { + sort.Sort(orderedGenerations) + } + + c.mu.Lock() + c.lastFindGenerations = genTime + c.lastGenerations = orderedGenerations + c.mu.Unlock() + return orderedGenerations } @@ -1323,3 +1348,16 @@ func (a tsmGenerations) chunk(size int) []tsmGenerations { } return chunks } + +func (a tsmGenerations) IsSorted() bool { + if len(a) == 1 { + return true + } + + for i := 1; i < len(a); i++ { + if a.Less(i, i-1) { + return false + } + } + return true +} diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index bbf85de5d0e..2ce815003fa 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -100,31 +100,31 @@ type Value interface { func NewValue(t int64, value interface{}) Value { switch v := value.(type) { case int64: - return &IntegerValue{unixnano: t, value: v} + return IntegerValue{unixnano: t, value: v} case float64: - return &FloatValue{unixnano: t, value: v} + return FloatValue{unixnano: t, value: v} case bool: - return &BooleanValue{unixnano: t, value: v} + return BooleanValue{unixnano: t, value: v} case string: - return &StringValue{unixnano: t, value: v} + return StringValue{unixnano: t, value: v} } return EmptyValue{} } func NewIntegerValue(t int64, v int64) Value { - return &IntegerValue{unixnano: t, value: v} + return IntegerValue{unixnano: t, value: v} } func NewFloatValue(t int64, v float64) Value { - return &FloatValue{unixnano: t, value: v} + return FloatValue{unixnano: t, value: v} } func NewBooleanValue(t int64, v bool) Value { - return &BooleanValue{unixnano: t, value: v} + return BooleanValue{unixnano: t, value: v} } func NewStringValue(t int64, v string) Value { - return &StringValue{unixnano: t, value: v} + return StringValue{unixnano: t, value: v} } type EmptyValue struct{} @@ -134,11 +134,11 @@ func (e EmptyValue) Value() interface{} { return nil } func (e EmptyValue) Size() int { return 0 } func (e EmptyValue) String() string { return "" } -func (_ EmptyValue) internalOnly() {} -func (_ *StringValue) internalOnly() {} -func (_ *IntegerValue) internalOnly() {} -func (_ *BooleanValue) internalOnly() {} -func (_ *FloatValue) internalOnly() {} +func (_ EmptyValue) internalOnly() {} +func (_ StringValue) internalOnly() {} +func (_ IntegerValue) internalOnly() {} +func (_ BooleanValue) internalOnly() {} +func (_ FloatValue) internalOnly() {} // Encode converts the values to a byte slice. If there are no values, // this function panics. @@ -148,13 +148,13 @@ func (a Values) Encode(buf []byte) ([]byte, error) { } switch a[0].(type) { - case *FloatValue: + case FloatValue: return encodeFloatBlock(buf, a) - case *IntegerValue: + case IntegerValue: return encodeIntegerBlock(buf, a) - case *BooleanValue: + case BooleanValue: return encodeBooleanBlock(buf, a) - case *StringValue: + case StringValue: return encodeStringBlock(buf, a) } @@ -168,13 +168,13 @@ func (a Values) InfluxQLType() (influxql.DataType, error) { } switch a[0].(type) { - case *FloatValue: + case FloatValue: return influxql.Float, nil - case *IntegerValue: + case IntegerValue: return influxql.Integer, nil - case *BooleanValue: + case BooleanValue: return influxql.Boolean, nil - case *StringValue: + case StringValue: return influxql.String, nil } @@ -225,7 +225,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err case BlockInteger: @@ -235,7 +235,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err @@ -246,7 +246,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err @@ -257,7 +257,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) { vals = make([]Value, len(decoded)) } for i := range decoded { - vals[i] = &decoded[i] + vals[i] = decoded[i] } return vals[:len(decoded)], err @@ -271,19 +271,19 @@ type FloatValue struct { value float64 } -func (f *FloatValue) UnixNano() int64 { +func (f FloatValue) UnixNano() int64 { return f.unixnano } -func (f *FloatValue) Value() interface{} { +func (f FloatValue) Value() interface{} { return f.value } -func (f *FloatValue) Size() int { +func (f FloatValue) Size() int { return 16 } -func (f *FloatValue) String() string { +func (f FloatValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.value) } @@ -306,7 +306,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { err := func() error { for _, v := range values { tsenc.Write(v.UnixNano()) - venc.Push(v.(*FloatValue).value) + venc.Push(v.(FloatValue).value) } venc.Finish() @@ -398,19 +398,19 @@ type BooleanValue struct { value bool } -func (b *BooleanValue) Size() int { +func (b BooleanValue) Size() int { return 9 } -func (b *BooleanValue) UnixNano() int64 { +func (b BooleanValue) UnixNano() int64 { return b.unixnano } -func (b *BooleanValue) Value() interface{} { +func (b BooleanValue) Value() interface{} { return b.value } -func (f *BooleanValue) String() string { +func (f BooleanValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value()) } @@ -430,7 +430,7 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) { err := func() error { for _, v := range values { tsenc.Write(v.UnixNano()) - venc.Write(v.(*BooleanValue).value) + venc.Write(v.(BooleanValue).value) } // Encoded timestamp values @@ -516,19 +516,19 @@ type IntegerValue struct { value int64 } -func (v *IntegerValue) Value() interface{} { +func (v IntegerValue) Value() interface{} { return v.value } -func (v *IntegerValue) UnixNano() int64 { +func (v IntegerValue) UnixNano() int64 { return v.unixnano } -func (v *IntegerValue) Size() int { +func (v IntegerValue) Size() int { return 16 } -func (f *IntegerValue) String() string { +func (f IntegerValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value()) } @@ -540,7 +540,7 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) { err := func() error { for _, v := range values { tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*IntegerValue).value) + vEnc.Write(v.(IntegerValue).value) } // Encoded timestamp values @@ -626,31 +626,31 @@ type StringValue struct { value string } -func (v *StringValue) Value() interface{} { +func (v StringValue) Value() interface{} { return v.value } -func (v *StringValue) UnixNano() int64 { +func (v StringValue) UnixNano() int64 { return v.unixnano } -func (v *StringValue) Size() int { +func (v StringValue) Size() int { return 8 + len(v.value) } -func (f *StringValue) String() string { +func (f StringValue) String() string { return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value()) } func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := getTimeEncoder(len(values)) - vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value)) + vEnc := getStringEncoder(len(values) * len(values[0].(StringValue).value)) var b []byte err := func() error { for _, v := range values { tsEnc.Write(v.UnixNano()) - vEnc.Write(v.(*StringValue).value) + vEnc.Write(v.(StringValue).value) } // Encoded timestamp values diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go index b4b384da1c8..07e36b124cb 100644 --- a/tsdb/engine/tsm1/encoding_test.go +++ b/tsdb/engine/tsm1/encoding_test.go @@ -503,7 +503,7 @@ func TestValues_MergeFloat(t *testing.T) { func TestIntegerValues_Merge(t *testing.T) { integerValue := func(t int64, f int64) tsm1.IntegerValue { - return *(tsm1.NewValue(t, f).(*tsm1.IntegerValue)) + return tsm1.NewValue(t, f).(tsm1.IntegerValue) } tests := []struct { @@ -636,7 +636,7 @@ func TestIntegerValues_Merge(t *testing.T) { func TestFloatValues_Merge(t *testing.T) { floatValue := func(t int64, f float64) tsm1.FloatValue { - return *(tsm1.NewValue(t, f).(*tsm1.FloatValue)) + return tsm1.NewValue(t, f).(tsm1.FloatValue) } tests := []struct { @@ -765,7 +765,7 @@ func TestFloatValues_Merge(t *testing.T) { func TestBooleanValues_Merge(t *testing.T) { booleanValue := func(t int64, f bool) tsm1.BooleanValue { - return *(tsm1.NewValue(t, f).(*tsm1.BooleanValue)) + return tsm1.NewValue(t, f).(tsm1.BooleanValue) } tests := []struct { @@ -894,7 +894,7 @@ func TestBooleanValues_Merge(t *testing.T) { func TestStringValues_Merge(t *testing.T) { stringValue := func(t int64, f string) tsm1.StringValue { - return *(tsm1.NewValue(t, f).(*tsm1.StringValue)) + return tsm1.NewValue(t, f).(tsm1.StringValue) } tests := []struct { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cc757933322..570fcd514c5 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -930,12 +930,14 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // compactCache continually checks if the WAL cache should be written to disk func (e *Engine) compactCache(quit <-chan struct{}) { + t := time.NewTicker(time.Second) + defer t.Stop() for { select { case <-quit: return - default: + case <-t.C: e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { start := time.Now() @@ -950,7 +952,6 @@ func (e *Engine) compactCache(quit <-chan struct{}) { atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds()) } } - time.Sleep(time.Second) } } @@ -968,37 +969,38 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { + t := time.NewTicker(time.Second) + defer t.Stop() + for { select { case <-quit: return - default: + case <-t.C: s := e.levelCompactionStrategy(fast, level) - if s == nil { - time.Sleep(time.Second) - continue + if s != nil { + s.Apply() } - - s.Apply() } } } func (e *Engine) compactTSMFull(quit <-chan struct{}) { + t := time.NewTicker(time.Second) + defer t.Stop() + for { select { case <-quit: return - default: + case <-t.C: s := e.fullCompactionStrategy() - if s == nil { - time.Sleep(time.Second) - continue + if s != nil { + s.Apply() } - s.Apply() } } } diff --git a/tsdb/engine/tsm1/iterator.gen.go b/tsdb/engine/tsm1/iterator.gen.go index 762a0467996..66c7c8470c8 100644 --- a/tsdb/engine/tsm1/iterator.gen.go +++ b/tsdb/engine/tsm1/iterator.gen.go @@ -343,7 +343,7 @@ func (c *floatAscendingCursor) peekCache() (t int64, v float64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*FloatValue).value + return item.UnixNano(), item.(FloatValue).value } // peekTSM returns the current time/value from tsm. @@ -463,7 +463,7 @@ func (c *floatDescendingCursor) peekCache() (t int64, v float64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*FloatValue).value + return item.UnixNano(), item.(FloatValue).value } // peekTSM returns the current time/value from tsm. @@ -786,7 +786,7 @@ func (c *integerAscendingCursor) peekCache() (t int64, v int64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*IntegerValue).value + return item.UnixNano(), item.(IntegerValue).value } // peekTSM returns the current time/value from tsm. @@ -906,7 +906,7 @@ func (c *integerDescendingCursor) peekCache() (t int64, v int64) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*IntegerValue).value + return item.UnixNano(), item.(IntegerValue).value } // peekTSM returns the current time/value from tsm. @@ -1229,7 +1229,7 @@ func (c *stringAscendingCursor) peekCache() (t int64, v string) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*StringValue).value + return item.UnixNano(), item.(StringValue).value } // peekTSM returns the current time/value from tsm. @@ -1349,7 +1349,7 @@ func (c *stringDescendingCursor) peekCache() (t int64, v string) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*StringValue).value + return item.UnixNano(), item.(StringValue).value } // peekTSM returns the current time/value from tsm. @@ -1672,7 +1672,7 @@ func (c *booleanAscendingCursor) peekCache() (t int64, v bool) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*BooleanValue).value + return item.UnixNano(), item.(BooleanValue).value } // peekTSM returns the current time/value from tsm. @@ -1792,7 +1792,7 @@ func (c *booleanDescendingCursor) peekCache() (t int64, v bool) { } item := c.cache.values[c.cache.pos] - return item.UnixNano(), item.(*BooleanValue).value + return item.UnixNano(), item.(BooleanValue).value } // peekTSM returns the current time/value from tsm. diff --git a/tsdb/engine/tsm1/iterator.gen.go.tmpldata b/tsdb/engine/tsm1/iterator.gen.go.tmpldata index 4aacad62d82..36e7b8311cf 100644 --- a/tsdb/engine/tsm1/iterator.gen.go.tmpldata +++ b/tsdb/engine/tsm1/iterator.gen.go.tmpldata @@ -3,28 +3,28 @@ "Name":"Float", "name":"float", "Type":"float64", - "ValueType":"*FloatValue", + "ValueType":"FloatValue", "Nil":"0" }, { "Name":"Integer", "name":"integer", "Type":"int64", - "ValueType":"*IntegerValue", + "ValueType":"IntegerValue", "Nil":"0" }, { "Name":"String", "name":"string", "Type":"string", - "ValueType":"*StringValue", + "ValueType":"StringValue", "Nil":"\"\"" }, { "Name":"Boolean", "name":"boolean", "Type":"bool", - "ValueType":"*BooleanValue", + "ValueType":"BooleanValue", "Nil":"false" } ] diff --git a/tsdb/engine/tsm1/pools.go b/tsdb/engine/tsm1/pools.go index 8f8b05e26fb..92d6db75d1b 100644 --- a/tsdb/engine/tsm1/pools.go +++ b/tsdb/engine/tsm1/pools.go @@ -39,7 +39,7 @@ func getFloat64Values(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &FloatValue{} + buf[i] = FloatValue{} } } return buf[:size] @@ -65,7 +65,7 @@ func getIntegerValues(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &IntegerValue{} + buf[i] = IntegerValue{} } } return buf[:size] @@ -91,7 +91,7 @@ func getBooleanValues(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &BooleanValue{} + buf[i] = BooleanValue{} } } return buf[:size] @@ -117,7 +117,7 @@ func getStringValues(size int) []Value { for i, v := range buf { if v == nil { - buf[i] = &StringValue{} + buf[i] = StringValue{} } } return buf[:size] @@ -130,13 +130,13 @@ func putBooleanValues(buf []Value) { func putValue(buf []Value) { if len(buf) > 0 { switch buf[0].(type) { - case *FloatValue: + case FloatValue: putFloat64Values(buf) - case *IntegerValue: + case IntegerValue: putIntegerValues(buf) - case *BooleanValue: + case BooleanValue: putBooleanValues(buf) - case *StringValue: + case StringValue: putStringValues(buf) } } diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index c5ac249bd77..0f8aa0bc7e4 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -516,13 +516,13 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { encLen += 8 * len(v) // timestamps (8) switch v[0].(type) { - case *FloatValue, *IntegerValue: + case FloatValue, IntegerValue: encLen += 8 * len(v) - case *BooleanValue: + case BooleanValue: encLen += 1 * len(v) - case *StringValue: + case StringValue: for _, vv := range v { - str, ok := vv.(*StringValue) + str, ok := vv.(StringValue) if !ok { return nil, fmt.Errorf("non-string found in string value slice: %T", vv) } @@ -546,13 +546,13 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { for k, v := range w.Values { switch v[0].(type) { - case *FloatValue: + case FloatValue: curType = float64EntryType - case *IntegerValue: + case IntegerValue: curType = integerEntryType - case *BooleanValue: + case BooleanValue: curType = booleanEntryType - case *StringValue: + case StringValue: curType = stringEntryType default: return nil, fmt.Errorf("unsupported value type: %T", v[0]) @@ -572,19 +572,19 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { n += 8 switch vv := vv.(type) { - case *FloatValue: + case FloatValue: if curType != float64EntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value)) n += 8 - case *IntegerValue: + case IntegerValue: if curType != integerEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value)) n += 8 - case *BooleanValue: + case BooleanValue: if curType != booleanEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } @@ -594,7 +594,7 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { dst[n] = 0 } n++ - case *StringValue: + case StringValue: if curType != stringEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } @@ -647,19 +647,19 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { switch typ { case float64EntryType: for i := 0; i < nvals; i++ { - values[i] = &FloatValue{} + values[i] = FloatValue{} } case integerEntryType: for i := 0; i < nvals; i++ { - values[i] = &IntegerValue{} + values[i] = IntegerValue{} } case booleanEntryType: for i := 0; i < nvals; i++ { - values[i] = &BooleanValue{} + values[i] = BooleanValue{} } case stringEntryType: for i := 0; i < nvals; i++ { - values[i] = &StringValue{} + values[i] = StringValue{} } default: @@ -682,9 +682,11 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8]))) i += 8 - if fv, ok := values[j].(*FloatValue); ok { - fv.unixnano = un - fv.value = v + if fv, ok := values[j].(FloatValue); ok { + x := (&fv) + x.unixnano = un + x.value = v + values[j] = *x } case integerEntryType: if i+8 > len(b) { @@ -693,9 +695,11 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := int64(binary.BigEndian.Uint64(b[i : i+8])) i += 8 - if fv, ok := values[j].(*IntegerValue); ok { - fv.unixnano = un - fv.value = v + if fv, ok := values[j].(IntegerValue); ok { + x := (&fv) + x.unixnano = un + x.value = v + values[j] = *x } case booleanEntryType: if i >= len(b) { @@ -704,13 +708,16 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := b[i] i += 1 - if fv, ok := values[j].(*BooleanValue); ok { + if fv, ok := values[j].(BooleanValue); ok { + x := (&fv) + x.unixnano = un fv.unixnano = un if v == 1 { - fv.value = true + x.value = true } else { - fv.value = false + x.value = false } + values[j] = *x } case stringEntryType: if i+4 > len(b) { @@ -730,9 +737,11 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { v := string(b[i : i+length]) i += length - if fv, ok := values[j].(*StringValue); ok { - fv.unixnano = un - fv.value = v + if fv, ok := values[j].(StringValue); ok { + x := (&fv) + x.unixnano = un + x.value = v + values[j] = *x } default: return fmt.Errorf("unsupported value type: %#v", typ) diff --git a/tsdb/meta.go b/tsdb/meta.go index f37c57fc907..a9ee17eca94 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -1929,6 +1929,17 @@ func MarshalTags(tags map[string]string) []byte { return b } +// WalkTagKeys calls fn for each tag key associated with m. The order of the +// keys is undefined. +func (m *Measurement) WalkTagKeys(fn func(k string)) { + m.mu.RLock() + defer m.mu.RUnlock() + + for k := range m.seriesByTagKeyValue { + fn(k) + } +} + // TagKeys returns a list of the measurement's tag names. func (m *Measurement) TagKeys() []string { m.mu.RLock() diff --git a/tsdb/shard.go b/tsdb/shard.go index b460534dcaf..2c5ec264691 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -856,7 +856,7 @@ func (s *Shard) monitor() { } for _, m := range s.index.Measurements() { - for _, k := range m.TagKeys() { + m.WalkTagKeys(func(k string) { n := m.Cardinality(k) perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) if perc > 100 { @@ -868,7 +868,7 @@ func (s *Shard) monitor() { s.logger.Printf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k) } - } + }) } } }