Skip to content

Commit

Permalink
accept update on usage and threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Sep 7, 2017
1 parent 11b8c7c commit e888794
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 74 deletions.
2 changes: 1 addition & 1 deletion helm/charts/maestro/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: maestro
home: https://github.com/topfreegames/maestro
description: Maestro api and worker
version: 3.7.0
version: 3.7.1
maintainers:
- name: TFGCo
email: backend@tfgco.com
2 changes: 1 addition & 1 deletion metadata/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package metadata

//Version of Maestro
var Version = "3.7.0"
var Version = "3.7.1"

//KubeVersion is the desired Kubernetes version
var KubeVersion = "v1.7.0"
19 changes: 8 additions & 11 deletions models/scale_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,26 @@ package models

//ScaleInfo holds information about last time scheduler was verified if it needed to be scaled
// and how many time it was above or below threshold
//TODO: if time is changed on the yaml, the watcher (or the worker) needs to restart =/
type ScaleInfo struct {
pointsAboveUsage int
points []float32
pointer int
threshold int
usage float32
length int
}

// NewScaleInfo returns a new ScaleInfo
func NewScaleInfo(cap, threshold, usage int) *ScaleInfo {
func NewScaleInfo(cap int) *ScaleInfo {
return &ScaleInfo{
points: make([]float32, cap),
threshold: threshold,
usage: float32(usage) / 100,
points: make([]float32, cap),
}
}

// AddPoint inserts a new point on a circular list and updates pointsAboveUsage
func (s *ScaleInfo) AddPoint(point, total int) {
func (s *ScaleInfo) AddPoint(point, total int, usage float32) {
if s.length >= len(s.points) {
s.length = len(s.points) - 1
if s.points[s.pointer] >= s.usage {
if s.points[s.pointer] >= usage {
s.pointsAboveUsage = s.pointsAboveUsage - 1
}
}
Expand All @@ -40,14 +37,14 @@ func (s *ScaleInfo) AddPoint(point, total int) {
s.points[s.pointer] = currentUsage
s.length = s.length + 1
s.pointer = (s.pointer + 1) % cap(s.points)
if currentUsage >= s.usage {
if currentUsage >= usage {
s.pointsAboveUsage = s.pointsAboveUsage + 1
}
}

// IsAboveThreshold returns true if the percentage of points above usage is greater than threshold
func (s *ScaleInfo) IsAboveThreshold() bool {
return 100*s.pointsAboveUsage >= s.threshold*s.length
func (s *ScaleInfo) IsAboveThreshold(threshold int) bool {
return 100*s.pointsAboveUsage >= threshold*s.length
}

// GetPoints returns the array of points, where each point is the usage at that time
Expand Down
59 changes: 32 additions & 27 deletions models/scale_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,69 @@ import (

var _ = Describe("ScaleInfo", func() {
It("should add point at position 0", func() {
var cap, threshold, usage int = 4, 50, 50
scaleInfo := models.NewScaleInfo(cap, threshold, usage)
scaleInfo.AddPoint(6, 10)
var cap, threshold int = 4, 50
var usage float32 = 0.5
scaleInfo := models.NewScaleInfo(cap)
scaleInfo.AddPoint(6, 10, usage)
points := scaleInfo.GetPoints()
Expect(points).To(HaveCap(cap))
Expect(points[0]).To(BeNumerically("~", 0.6, 1e-6))
Expect(scaleInfo.IsAboveThreshold()).To(BeTrue())
Expect(scaleInfo.IsAboveThreshold(threshold)).To(BeTrue())
})

It("should add point at position 1", func() {
var cap, threshold, usage int = 4, 50, 50
scaleInfo := models.NewScaleInfo(cap, threshold, usage)
scaleInfo.AddPoint(6, 10)
scaleInfo.AddPoint(4, 10)
var cap, threshold int = 4, 50
var usage float32 = 0.5
scaleInfo := models.NewScaleInfo(cap)
scaleInfo.AddPoint(6, 10, usage)
scaleInfo.AddPoint(4, 10, usage)
points := scaleInfo.GetPoints()
Expect(points).To(HaveCap(cap))
Expect(points[0]).To(BeNumerically("~", 0.6, 1e-6))
Expect(points[1]).To(BeNumerically("~", 0.4, 1e-6))
Expect(scaleInfo.IsAboveThreshold()).To(BeTrue())
Expect(scaleInfo.IsAboveThreshold(threshold)).To(BeTrue())
})

It("should add point at position 2", func() {
var cap, threshold, usage int = 4, 50, 50
scaleInfo := models.NewScaleInfo(cap, threshold, usage)
scaleInfo.AddPoint(6, 10)
scaleInfo.AddPoint(4, 10)
scaleInfo.AddPoint(3, 10)
var cap, threshold int = 4, 50
var usage float32 = 0.5
scaleInfo := models.NewScaleInfo(cap)
scaleInfo.AddPoint(6, 10, usage)
scaleInfo.AddPoint(4, 10, usage)
scaleInfo.AddPoint(3, 10, usage)
points := scaleInfo.GetPoints()
Expect(points).To(HaveCap(cap))
Expect(points[0]).To(BeNumerically("~", 0.6, 1e-6))
Expect(points[1]).To(BeNumerically("~", 0.4, 1e-6))
Expect(points[2]).To(BeNumerically("~", 0.3, 1e-6))
Expect(scaleInfo.IsAboveThreshold()).To(BeFalse())
Expect(scaleInfo.IsAboveThreshold(threshold)).To(BeFalse())
})

It("should ovewrite at position 0", func() {
var cap, threshold, usage int = 2, 50, 50
scaleInfo := models.NewScaleInfo(cap, threshold, usage)
scaleInfo.AddPoint(6, 10)
scaleInfo.AddPoint(4, 10)
scaleInfo.AddPoint(3, 10)
var cap, threshold int = 2, 50
var usage float32 = 0.5
scaleInfo := models.NewScaleInfo(cap)
scaleInfo.AddPoint(6, 10, usage)
scaleInfo.AddPoint(4, 10, usage)
scaleInfo.AddPoint(3, 10, usage)
points := scaleInfo.GetPoints()
Expect(points).To(HaveCap(cap))
Expect(points[0]).To(BeNumerically("~", 0.3, 1e-6))
Expect(points[1]).To(BeNumerically("~", 0.4, 1e-6))
Expect(scaleInfo.IsAboveThreshold()).To(BeFalse())
Expect(scaleInfo.IsAboveThreshold(threshold)).To(BeFalse())
})

It("should ovewrite at position 0", func() {
var cap, threshold, usage int = 2, 50, 50
scaleInfo := models.NewScaleInfo(cap, threshold, usage)
scaleInfo.AddPoint(6, 10)
scaleInfo.AddPoint(8, 10)
scaleInfo.AddPoint(7, 10)
var cap, threshold int = 2, 50
var usage float32 = 0.5
scaleInfo := models.NewScaleInfo(cap)
scaleInfo.AddPoint(6, 10, usage)
scaleInfo.AddPoint(8, 10, usage)
scaleInfo.AddPoint(7, 10, usage)
points := scaleInfo.GetPoints()
Expect(points).To(HaveCap(cap))
Expect(points[0]).To(BeNumerically("~", 0.7, 1e-6))
Expect(points[1]).To(BeNumerically("~", 0.8, 1e-6))
Expect(scaleInfo.IsAboveThreshold()).To(BeTrue())
Expect(scaleInfo.IsAboveThreshold(threshold)).To(BeTrue())
})
})
31 changes: 17 additions & 14 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func (w *Watcher) configure() error {

func (w *Watcher) configureLogger() {
w.Logger = w.Logger.WithFields(logrus.Fields{
"source": "maestro-watcher",
"version": metadata.Version,
"source": "maestro-watcher",
"version": metadata.Version,
"scheduler": w.SchedulerName,
})
}

Expand All @@ -135,37 +136,33 @@ func (w *Watcher) configureTimeout(configYaml *models.ConfigYAML) {
}

func (w *Watcher) configureAutoScale(configYaml *models.ConfigYAML) {
var capacity, usage, threshold int
var capacity int

capacity = configYaml.AutoScaling.Up.Trigger.Time / w.AutoScalingPeriod
if capacity <= 0 {
capacity = 1
}
threshold = configYaml.AutoScaling.Up.Trigger.Threshold
usage = configYaml.AutoScaling.Up.Trigger.Usage
w.ScaleUpInfo = models.NewScaleInfo(capacity, threshold, usage)
w.ScaleUpInfo = models.NewScaleInfo(capacity)

capacity = configYaml.AutoScaling.Down.Trigger.Time / w.AutoScalingPeriod
if capacity <= 0 {
capacity = 1
}
threshold = configYaml.AutoScaling.Down.Trigger.Threshold
usage = 100 - configYaml.AutoScaling.Down.Trigger.Usage
w.ScaleDownInfo = models.NewScaleInfo(capacity, threshold, usage)
w.ScaleDownInfo = models.NewScaleInfo(capacity)
}

// Start starts the watcher
func (w *Watcher) Start() {
l := w.Logger.WithFields(logrus.Fields{
"operation": "start",
})
l.Info("starting watcher")
w.Run = true
sigchan := make(chan os.Signal)
signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

ticker := time.NewTicker(time.Duration(w.AutoScalingPeriod) * time.Second)

// TODO better use that buckets algorithm?
for w.Run == true {
select {
case <-ticker.C:
Expand Down Expand Up @@ -428,8 +425,11 @@ func (w *Watcher) checkState(
return true, false, changedState
}

w.ScaleUpInfo.AddPoint(roomCount.Occupied, roomCount.Total())
if w.ScaleUpInfo.IsAboveThreshold() {
threshold := autoScalingInfo.Up.Trigger.Threshold
usage := float32(autoScalingInfo.Up.Trigger.Usage) / 100

w.ScaleUpInfo.AddPoint(roomCount.Occupied, roomCount.Total(), usage)
if w.ScaleUpInfo.IsAboveThreshold(threshold) {
inSync = false
if scheduler.State != models.StateSubdimensioned {
scheduler.State = models.StateSubdimensioned
Expand All @@ -440,8 +440,11 @@ func (w *Watcher) checkState(
}
}

w.ScaleDownInfo.AddPoint(roomCount.Ready, roomCount.Total())
if w.ScaleDownInfo.IsAboveThreshold() && roomCount.Total()-autoScalingInfo.Down.Delta >= autoScalingInfo.Min {
threshold = autoScalingInfo.Down.Trigger.Threshold
usage = float32(autoScalingInfo.Down.Trigger.Usage) / 100

w.ScaleDownInfo.AddPoint(roomCount.Ready, roomCount.Total(), 1-usage)
if w.ScaleDownInfo.IsAboveThreshold(threshold) && roomCount.Total()-autoScalingInfo.Down.Delta >= autoScalingInfo.Min {
inSync = false
if scheduler.State != models.StateOverdimensioned {
scheduler.State = models.StateOverdimensioned
Expand Down
49 changes: 29 additions & 20 deletions watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ var _ = Describe("Watcher", func() {

Expect(func() { go w.Start() }).ShouldNot(Panic())
Eventually(func() bool { return w.Run }).Should(BeTrue())
Eventually(func() bool { return hook.LastEntry() != nil }, 1500*time.Millisecond).Should(BeTrue())
Expect(hook.LastEntry().Message).To(Equal("error getting watcher lock"))
Eventually(func() bool { return hook.LastEntry() != nil }).Should(BeTrue())
Eventually(func() string { return hook.LastEntry().Message }, 1500*time.Millisecond).Should(Equal("error getting watcher lock"))
})

It("should not panic if lock is being used", func() {
Expand All @@ -220,8 +220,9 @@ var _ = Describe("Watcher", func() {

Expect(func() { go w.Start() }).ShouldNot(Panic())
Eventually(func() bool { return w.Run }).Should(BeTrue())
Eventually(func() bool { return hook.LastEntry() != nil }, 1500*time.Millisecond).Should(BeTrue())
Expect(hook.LastEntry().Message).To(Equal("unable to get watcher my-scheduler lock, maybe some other process has it..."))
Eventually(func() bool { return hook.LastEntry() != nil }).Should(BeTrue())
Eventually(func() string { return hook.LastEntry().Message }, 1500*time.Millisecond).
Should(Equal("unable to get watcher my-scheduler lock, maybe some other process has it..."))
})
})

Expand Down Expand Up @@ -761,19 +762,21 @@ var _ = Describe("Watcher", func() {
mockPipeline.EXPECT().Exec()

total := configYaml1.AutoScaling.Up.Trigger.Time/w.AutoScalingPeriod - 1
usage := float32(configYaml1.AutoScaling.Up.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 50*total {
w.ScaleUpInfo.AddPoint(0, 10)
w.ScaleUpInfo.AddPoint(0, 10, usage)
} else {
w.ScaleUpInfo.AddPoint(10, 10)
w.ScaleUpInfo.AddPoint(10, 10, usage)
}
}
total = configYaml1.AutoScaling.Down.Trigger.Time/w.AutoScalingPeriod - 1
usage = float32(configYaml1.AutoScaling.Down.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 50*total {
w.ScaleDownInfo.AddPoint(10, 10)
w.ScaleDownInfo.AddPoint(10, 10, usage)
} else {
w.ScaleDownInfo.AddPoint(0, 10)
w.ScaleDownInfo.AddPoint(0, 10, usage)
}
}

Expand Down Expand Up @@ -823,19 +826,21 @@ var _ = Describe("Watcher", func() {
mockPipeline.EXPECT().Exec().Times(configYaml1.AutoScaling.Up.Delta)

total := configYaml1.AutoScaling.Up.Trigger.Time/w.AutoScalingPeriod - 1
usage := float32(configYaml1.AutoScaling.Up.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 90*total {
w.ScaleUpInfo.AddPoint(10, 10)
w.ScaleUpInfo.AddPoint(10, 10, usage)
} else {
w.ScaleUpInfo.AddPoint(0, 10)
w.ScaleUpInfo.AddPoint(0, 10, usage)
}
}
total = configYaml1.AutoScaling.Down.Trigger.Time/w.AutoScalingPeriod - 1
usage = float32(configYaml1.AutoScaling.Down.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 90*total {
w.ScaleDownInfo.AddPoint(0, 10)
w.ScaleDownInfo.AddPoint(0, 10, usage)
} else {
w.ScaleDownInfo.AddPoint(10, 10)
w.ScaleDownInfo.AddPoint(10, 10, usage)
}
}

Expand Down Expand Up @@ -890,19 +895,21 @@ var _ = Describe("Watcher", func() {
)

total := configYaml1.AutoScaling.Up.Trigger.Time/w.AutoScalingPeriod - 1
usage := float32(configYaml1.AutoScaling.Up.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 50*total {
w.ScaleUpInfo.AddPoint(0, 10)
w.ScaleUpInfo.AddPoint(0, 10, usage)
} else {
w.ScaleUpInfo.AddPoint(10, 10)
w.ScaleUpInfo.AddPoint(10, 10, usage)
}
}
total = configYaml1.AutoScaling.Down.Trigger.Time/w.AutoScalingPeriod - 1
usage = float32(configYaml1.AutoScaling.Down.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 50*total {
w.ScaleDownInfo.AddPoint(10, 10)
w.ScaleDownInfo.AddPoint(10, 10, usage)
} else {
w.ScaleDownInfo.AddPoint(0, 10)
w.ScaleDownInfo.AddPoint(0, 10, usage)
}
}

Expand Down Expand Up @@ -947,19 +954,21 @@ var _ = Describe("Watcher", func() {
}

total := configYaml1.AutoScaling.Up.Trigger.Time/w.AutoScalingPeriod - 1
usage := float32(configYaml1.AutoScaling.Up.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 90*total {
w.ScaleUpInfo.AddPoint(0, 10)
w.ScaleUpInfo.AddPoint(0, 10, usage)
} else {
w.ScaleUpInfo.AddPoint(10, 10)
w.ScaleUpInfo.AddPoint(10, 10, usage)
}
}
total = configYaml1.AutoScaling.Down.Trigger.Time/w.AutoScalingPeriod - 1
usage = float32(configYaml1.AutoScaling.Down.Trigger.Usage) / 100
for i := 0; i < total; i++ {
if 100*i <= 90*total {
w.ScaleDownInfo.AddPoint(10, 10)
w.ScaleDownInfo.AddPoint(10, 10, usage)
} else {
w.ScaleDownInfo.AddPoint(0, 10)
w.ScaleDownInfo.AddPoint(0, 10, usage)
}
}

Expand Down

0 comments on commit e888794

Please sign in to comment.