Skip to content

Commit

Permalink
Read occupiedTimeout from scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Jun 20, 2017
1 parent 306e3f2 commit b26ed73
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 19 deletions.
6 changes: 6 additions & 0 deletions api/scheduler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (g *SchedulerCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
logger.Debug("Creating scheduler...")

timeoutSec := g.App.Config.GetInt("scaleUpTimeoutSeconds")
if payload.OccupiedTimeout == 0 {
payload.OccupiedTimeout = g.App.Config.GetInt64("occupiedTimeout")
}
err := mr.WithSegment(models.SegmentController, func() error {
return controller.CreateScheduler(l, mr, g.App.DB, g.App.RedisClient, g.App.KubernetesClient, payload, timeoutSec)
})
Expand Down Expand Up @@ -151,6 +154,9 @@ func (g *SchedulerUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
}

timeoutSec := g.App.Config.GetInt("updateTimeoutSeconds")
if payload.OccupiedTimeout == 0 {
payload.OccupiedTimeout = g.App.Config.GetInt64("occupiedTimeout")
}
logger.WithField("time", time.Now()).Info("Starting update")
err = mr.WithSegment(models.SegmentController, func() error {
return controller.UpdateSchedulerConfig(
Expand Down
1 change: 1 addition & 0 deletions models/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ConfigYAML struct {
Cmd []string `yaml:"cmd" json:"cmd"`
NodeAffinity string `yaml:"affinity" json:"affinity"`
NodeToleration string `yaml:"toleration" json:"toleration"`
OccupiedTimeout int64 `yaml:"occupiedTimeout" json:"occupiedTimeout"`
}

// NewScheduler is the scheduler constructor
Expand Down
52 changes: 36 additions & 16 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Watcher struct {
Run bool
SchedulerName string
gracefulShutdown *gracefulShutdown
OccupiedTimeout int64
}

// NewWatcher is the watcher constructor
Expand All @@ -70,6 +71,7 @@ func NewWatcher(
w.loadConfigurationDefaults()
w.configure()
w.configureLogger()
w.configureTimeout()
return w
}

Expand Down Expand Up @@ -101,6 +103,22 @@ func (w *Watcher) configureLogger() {
})
}

func (w *Watcher) configureTimeout() error {
scheduler := models.NewScheduler(w.SchedulerName, "", "")
err := w.MetricsReporter.WithSegment(models.SegmentSelect, func() error {
return scheduler.Load(w.DB)
})
if err != nil {
return err
}
configYaml, err := models.NewConfigYAML(scheduler.YAML)
if err != nil {
return err
}
w.OccupiedTimeout = configYaml.OccupiedTimeout
return nil
}

// Start starts the watcher
func (w *Watcher) Start() {
l := w.Logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -159,22 +177,24 @@ func (w *Watcher) RemoveDeadRooms() {
logger.WithError(err).Error("error removing dead rooms")
}

since = time.Now().Unix() - w.Config.GetInt64("occupiedTimeout")
logger = w.Logger.WithFields(logrus.Fields{
"executionID": uuid.NewV4().String(),
"operation": "removeDeadOccupiedRooms",
"since": since,
})
err = controller.DeleteRoomsOccupiedTimeout(
logger,
w.MetricsReporter,
w.RedisClient.Client,
w.KubernetesClient,
w.SchedulerName,
since,
)
if err != nil {
logger.WithError(err).Error("error removing old occupied rooms")
if w.OccupiedTimeout > 0 {
since = time.Now().Unix() - w.OccupiedTimeout
logger = w.Logger.WithFields(logrus.Fields{
"executionID": uuid.NewV4().String(),
"operation": "removeDeadOccupiedRooms",
"since": since,
})
err = controller.DeleteRoomsOccupiedTimeout(
logger,
w.MetricsReporter,
w.RedisClient.Client,
w.KubernetesClient,
w.SchedulerName,
since,
)
if err != nil {
logger.WithError(err).Error("error removing old occupied rooms")
}
}
}

Expand Down
62 changes: 59 additions & 3 deletions watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
name: controller-name
game: controller
image: controller/controller:v123
occupiedTimeout: 300
ports:
- containerPort: 1234
protocol: UDP
Expand Down Expand Up @@ -80,6 +81,14 @@ var _ = Describe("Watcher", func() {
config.Set("watcher.autoScalingPeriod", autoScalingPeriod)
config.Set("watcher.lockKey", lockKey)
config.Set("watcher.lockTimeoutMs", lockTimeoutMs)

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, name)
Expect(w.AutoScalingPeriod).To(Equal(autoScalingPeriod))
Expect(w.Config).To(Equal(config))
Expand All @@ -95,6 +104,14 @@ var _ = Describe("Watcher", func() {

It("should return configured new watcher using configuration defaults", func() {
name := "my-scheduler"

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, name)
Expect(w.AutoScalingPeriod).To(Equal(10))
Expect(w.LockKey).To(Equal("maestro-lock-key"))
Expand All @@ -113,6 +130,13 @@ var _ = Describe("Watcher", func() {
err := yaml.Unmarshal([]byte(yaml1), &configYaml1)
Expect(err).NotTo(HaveOccurred())

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", configYaml1.Name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, configYaml1.Name)
Expect(w).NotTo(BeNil())

Expand Down Expand Up @@ -166,6 +190,14 @@ var _ = Describe("Watcher", func() {

It("should not panic if error acquiring lock", func() {
name := "my-scheduler"

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, name)
Expect(w).NotTo(BeNil())
defer func() { w.Run = false }()
Expand All @@ -181,6 +213,14 @@ var _ = Describe("Watcher", func() {

It("should not panic if lock is being used", func() {
name := "my-scheduler"

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, name)
Expect(w).NotTo(BeNil())
defer func() { w.Run = false }()
Expand All @@ -202,6 +242,14 @@ var _ = Describe("Watcher", func() {
BeforeEach(func() {
err := yaml.Unmarshal([]byte(yaml1), &configYaml1)
Expect(err).NotTo(HaveOccurred())

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", configYaml1.Name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w = watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, configYaml1.Name)
Expect(w).NotTo(BeNil())
})
Expand Down Expand Up @@ -688,6 +736,14 @@ var _ = Describe("Watcher", func() {
BeforeEach(func() {
err := yaml.Unmarshal([]byte(yaml1), &configYaml1)
Expect(err).NotTo(HaveOccurred())

mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", configYaml1.Name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)

w = watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, configYaml1.Name)
Expect(w).NotTo(BeNil())
})
Expand All @@ -709,7 +765,7 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult([]string{}, nil))

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.Config.GetInt64("occupiedTimeout")
ts = time.Now().Unix() - w.OccupiedTimeout
expectedRooms := []string{"room1", "room2", "room3"}
mockRedisClient.EXPECT().ZRangeByScore(
lKey,
Expand Down Expand Up @@ -753,7 +809,7 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult([]string{}, errors.New("some error")))

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.Config.GetInt64("occupiedTimeout")
ts = time.Now().Unix() - w.OccupiedTimeout
expectedRooms := []string{"room1", "room2", "room3"}
mockRedisClient.EXPECT().ZRangeByScore(
lKey,
Expand Down Expand Up @@ -798,7 +854,7 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult([]string{}, nil))

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.Config.GetInt64("occupiedTimeout")
ts = time.Now().Unix() - w.OccupiedTimeout
expectedRooms := []string{"room1", "room2", "room3"}
mockRedisClient.EXPECT().ZRangeByScore(
lKey,
Expand Down
56 changes: 56 additions & 0 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,42 @@ import (
)

var _ = Describe("Worker", func() {
yaml1 := `
name: controller-name
game: controller
image: controller/controller:v123
occupiedTimeout: 300
ports:
- containerPort: 1234
protocol: UDP
name: port1
- containerPort: 7654
protocol: TCP
name: port2
limits:
memory: "66Mi"
cpu: "2"
shutdownTimeout: 20
autoscaling:
min: 3
up:
delta: 2
trigger:
usage: 60
time: 100
cooldown: 200
down:
delta: 1
trigger:
usage: 30
time: 500
cooldown: 500
env:
- name: MY_ENV_VAR
value: myvalue
cmd:
- "./room"
`
Describe("NewWorker", func() {
It("should return configured new worker", func() {
mockRedisClient.EXPECT().Ping()
Expand Down Expand Up @@ -90,6 +126,12 @@ var _ = Describe("Worker", func() {

It("should add watcher to watchers map and run it in a goroutine", func() {
schedulerNames := []string{"scheduler-1"}
mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", schedulerNames[0]).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)
w.EnsureRunningWatchers(schedulerNames)
Expect(w.Watchers).To(HaveKey(schedulerNames[0]))
Expect(w.Watchers[schedulerNames[0]].SchedulerName).To(Equal(schedulerNames[0]))
Expand All @@ -98,6 +140,12 @@ var _ = Describe("Worker", func() {

It("should set watcher.Run to true", func() {
schedulerNames := []string{"scheduler-1"}
mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", schedulerNames[0]).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)
watcher1 := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, schedulerNames[0])
w.Watchers[watcher1.SchedulerName] = watcher1
Expect(w.Watchers[schedulerNames[0]].Run).To(BeFalse())
Expand All @@ -120,6 +168,14 @@ var _ = Describe("Worker", func() {

It("should remove watcher if it should not be running", func() {
schedulerNames := []string{"scheduler-1", "scheduler-2"}
for _, name := range schedulerNames {
mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", name).Do(
func(scheduler *models.Scheduler, query, name string) {
scheduler.Name = name
scheduler.YAML = yaml1
},
)
}
watcher1 := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, schedulerNames[0])
w.Watchers[watcher1.SchedulerName] = watcher1
Expect(w.Watchers[schedulerNames[0]].Run).To(BeFalse())
Expand Down

0 comments on commit b26ed73

Please sign in to comment.