Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.3 backports #8063

Merged
merged 6 commits into from
Feb 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (s *Service) Run(database, name string, t time.Time) error {
// backgroundLoop runs on a go routine and periodically executes CQs.
func (s *Service) backgroundLoop() {
leaseName := "continuous_querier"
t := time.NewTimer(s.RunInterval)
defer t.Stop()
defer s.wg.Done()
for {
select {
Expand All @@ -210,13 +212,15 @@ func (s *Service) backgroundLoop() {
s.Logger.Printf("running continuous queries by request for time: %v", req.Now)
s.runContinuousQueries(req)
}
case <-time.After(s.RunInterval):
case <-t.C:
if !s.hasContinuousQueries() {
t.Reset(s.RunInterval)
continue
}
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
s.runContinuousQueries(&RunRequest{Now: time.Now()})
}
t.Reset(s.RunInterval)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,13 +683,13 @@ func (c *Cache) updateMemSize(b int64) {

func valueType(v Value) int {
switch v.(type) {
case *FloatValue:
case FloatValue:
return 1
case *IntegerValue:
case IntegerValue:
return 2
case *StringValue:
case StringValue:
return 3
case *BooleanValue:
case BooleanValue:
return 4
default:
return 0
Expand Down
40 changes: 39 additions & 1 deletion tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type DefaultPlanner struct {

// lastPlanCheck is the last time Plan was called
lastPlanCheck time.Time

mu sync.RWMutex
// lastFindGenerations is the last time findGenerations was run
lastFindGenerations time.Time

// lastGenerations is the last set of generations found by findGenerations
lastGenerations tsmGenerations
}

// tsmGeneration represents the TSM files within a generation.
Expand Down Expand Up @@ -458,6 +465,16 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// findGenerations groups all the TSM files by they generation based
// on their filename then returns the generations in descending order (newest first)
func (c *DefaultPlanner) findGenerations() tsmGenerations {
c.mu.RLock()
last := c.lastFindGenerations
lastGen := c.lastGenerations
c.mu.RUnlock()

if !last.IsZero() && c.FileStore.LastModified().Equal(last) {
return lastGen
}

genTime := c.FileStore.LastModified()
tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
Expand All @@ -477,7 +494,15 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
for _, g := range generations {
orderedGenerations = append(orderedGenerations, g)
}
sort.Sort(orderedGenerations)
if !orderedGenerations.IsSorted() {
sort.Sort(orderedGenerations)
}

c.mu.Lock()
c.lastFindGenerations = genTime
c.lastGenerations = orderedGenerations
c.mu.Unlock()

return orderedGenerations
}

Expand Down Expand Up @@ -1323,3 +1348,16 @@ func (a tsmGenerations) chunk(size int) []tsmGenerations {
}
return chunks
}

func (a tsmGenerations) IsSorted() bool {
if len(a) == 1 {
return true
}

for i := 1; i < len(a); i++ {
if a.Less(i, i-1) {
return false
}
}
return true
}
92 changes: 46 additions & 46 deletions tsdb/engine/tsm1/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,31 @@ type Value interface {
func NewValue(t int64, value interface{}) Value {
switch v := value.(type) {
case int64:
return &IntegerValue{unixnano: t, value: v}
return IntegerValue{unixnano: t, value: v}
case float64:
return &FloatValue{unixnano: t, value: v}
return FloatValue{unixnano: t, value: v}
case bool:
return &BooleanValue{unixnano: t, value: v}
return BooleanValue{unixnano: t, value: v}
case string:
return &StringValue{unixnano: t, value: v}
return StringValue{unixnano: t, value: v}
}
return EmptyValue{}
}

func NewIntegerValue(t int64, v int64) Value {
return &IntegerValue{unixnano: t, value: v}
return IntegerValue{unixnano: t, value: v}
}

func NewFloatValue(t int64, v float64) Value {
return &FloatValue{unixnano: t, value: v}
return FloatValue{unixnano: t, value: v}
}

func NewBooleanValue(t int64, v bool) Value {
return &BooleanValue{unixnano: t, value: v}
return BooleanValue{unixnano: t, value: v}
}

func NewStringValue(t int64, v string) Value {
return &StringValue{unixnano: t, value: v}
return StringValue{unixnano: t, value: v}
}

type EmptyValue struct{}
Expand All @@ -134,11 +134,11 @@ func (e EmptyValue) Value() interface{} { return nil }
func (e EmptyValue) Size() int { return 0 }
func (e EmptyValue) String() string { return "" }

func (_ EmptyValue) internalOnly() {}
func (_ *StringValue) internalOnly() {}
func (_ *IntegerValue) internalOnly() {}
func (_ *BooleanValue) internalOnly() {}
func (_ *FloatValue) internalOnly() {}
func (_ EmptyValue) internalOnly() {}
func (_ StringValue) internalOnly() {}
func (_ IntegerValue) internalOnly() {}
func (_ BooleanValue) internalOnly() {}
func (_ FloatValue) internalOnly() {}

// Encode converts the values to a byte slice. If there are no values,
// this function panics.
Expand All @@ -148,13 +148,13 @@ func (a Values) Encode(buf []byte) ([]byte, error) {
}

switch a[0].(type) {
case *FloatValue:
case FloatValue:
return encodeFloatBlock(buf, a)
case *IntegerValue:
case IntegerValue:
return encodeIntegerBlock(buf, a)
case *BooleanValue:
case BooleanValue:
return encodeBooleanBlock(buf, a)
case *StringValue:
case StringValue:
return encodeStringBlock(buf, a)
}

Expand All @@ -168,13 +168,13 @@ func (a Values) InfluxQLType() (influxql.DataType, error) {
}

switch a[0].(type) {
case *FloatValue:
case FloatValue:
return influxql.Float, nil
case *IntegerValue:
case IntegerValue:
return influxql.Integer, nil
case *BooleanValue:
case BooleanValue:
return influxql.Boolean, nil
case *StringValue:
case StringValue:
return influxql.String, nil
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = &decoded[i]
vals[i] = decoded[i]
}
return vals[:len(decoded)], err
case BlockInteger:
Expand All @@ -235,7 +235,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = &decoded[i]
vals[i] = decoded[i]
}
return vals[:len(decoded)], err

Expand All @@ -246,7 +246,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = &decoded[i]
vals[i] = decoded[i]
}
return vals[:len(decoded)], err

Expand All @@ -257,7 +257,7 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = &decoded[i]
vals[i] = decoded[i]
}
return vals[:len(decoded)], err

Expand All @@ -271,19 +271,19 @@ type FloatValue struct {
value float64
}

func (f *FloatValue) UnixNano() int64 {
func (f FloatValue) UnixNano() int64 {
return f.unixnano
}

func (f *FloatValue) Value() interface{} {
func (f FloatValue) Value() interface{} {
return f.value
}

func (f *FloatValue) Size() int {
func (f FloatValue) Size() int {
return 16
}

func (f *FloatValue) String() string {
func (f FloatValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.value)
}

Expand All @@ -306,7 +306,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
err := func() error {
for _, v := range values {
tsenc.Write(v.UnixNano())
venc.Push(v.(*FloatValue).value)
venc.Push(v.(FloatValue).value)
}
venc.Finish()

Expand Down Expand Up @@ -398,19 +398,19 @@ type BooleanValue struct {
value bool
}

func (b *BooleanValue) Size() int {
func (b BooleanValue) Size() int {
return 9
}

func (b *BooleanValue) UnixNano() int64 {
func (b BooleanValue) UnixNano() int64 {
return b.unixnano
}

func (b *BooleanValue) Value() interface{} {
func (b BooleanValue) Value() interface{} {
return b.value
}

func (f *BooleanValue) String() string {
func (f BooleanValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value())
}

Expand All @@ -430,7 +430,7 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
err := func() error {
for _, v := range values {
tsenc.Write(v.UnixNano())
venc.Write(v.(*BooleanValue).value)
venc.Write(v.(BooleanValue).value)
}

// Encoded timestamp values
Expand Down Expand Up @@ -516,19 +516,19 @@ type IntegerValue struct {
value int64
}

func (v *IntegerValue) Value() interface{} {
func (v IntegerValue) Value() interface{} {
return v.value
}

func (v *IntegerValue) UnixNano() int64 {
func (v IntegerValue) UnixNano() int64 {
return v.unixnano
}

func (v *IntegerValue) Size() int {
func (v IntegerValue) Size() int {
return 16
}

func (f *IntegerValue) String() string {
func (f IntegerValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value())
}

Expand All @@ -540,7 +540,7 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
err := func() error {
for _, v := range values {
tsEnc.Write(v.UnixNano())
vEnc.Write(v.(*IntegerValue).value)
vEnc.Write(v.(IntegerValue).value)
}

// Encoded timestamp values
Expand Down Expand Up @@ -626,31 +626,31 @@ type StringValue struct {
value string
}

func (v *StringValue) Value() interface{} {
func (v StringValue) Value() interface{} {
return v.value
}

func (v *StringValue) UnixNano() int64 {
func (v StringValue) UnixNano() int64 {
return v.unixnano
}

func (v *StringValue) Size() int {
func (v StringValue) Size() int {
return 8 + len(v.value)
}

func (f *StringValue) String() string {
func (f StringValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value())
}

func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
tsEnc := getTimeEncoder(len(values))
vEnc := getStringEncoder(len(values) * len(values[0].(*StringValue).value))
vEnc := getStringEncoder(len(values) * len(values[0].(StringValue).value))

var b []byte
err := func() error {
for _, v := range values {
tsEnc.Write(v.UnixNano())
vEnc.Write(v.(*StringValue).value)
vEnc.Write(v.(StringValue).value)
}

// Encoded timestamp values
Expand Down
Loading