Skip to content

Commit

Permalink
Timeout on occupied status
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Jun 20, 2017
1 parent 25f6d6c commit 306e3f2
Show file tree
Hide file tree
Showing 18 changed files with 677 additions and 26 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ integration-run:
MAESTRO_EXTENSIONS_REDIS_URL=redis://${MY_IP}:6333 \
ginkgo -tags integration -cover -r \
-randomizeAllSpecs -randomizeSuites \
-skipMeasurements worker api models controller;
-skipMeasurements worker api models controller;

int-ci: integration-board clear-coverage-profiles deps-test-ci integration-run gather-integration-profiles

Expand Down Expand Up @@ -158,3 +158,11 @@ minikube-ci:

work:
@go run main.go worker

clean-int-tests:
@echo 'deleting maestro-test-* namespaces'
@kubectl --context minikube get namespace | grep maestro-test- | awk '{print $$1}' | xargs kubectl --context minikube delete namespace
@echo 'done'
@echo 'deleting maestro-test-* nodes'
@kubectl --context minikube get nodes | grep maestro-test- | awk '{print $$1}' | xargs kubectl --context minikube delete nodes
@echo 'done'
8 changes: 8 additions & 0 deletions api/room_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var _ = Describe("Room Handler", func() {
rKey := "scheduler:schedulerName:rooms:roomName"
pKey := "scheduler:schedulerName:ping"
sKey := "scheduler:schedulerName:status:ready"
lKey := "scheduler:schedulerName:last:status:occupied"
roomName := "roomName"
status := "ready"
allStatusKeys := []string{
"scheduler:schedulerName:status:creating",
Expand All @@ -60,6 +62,7 @@ var _ = Describe("Room Handler", func() {
"status": status,
})
mockPipeline.EXPECT().ZAdd(pKey, gomock.Any())
mockPipeline.EXPECT().ZRem(lKey, roomName)
mockPipeline.EXPECT().SAdd(sKey, rKey)
for _, key := range allStatusKeys {
mockPipeline.EXPECT().SRem(key, rKey)
Expand Down Expand Up @@ -156,6 +159,7 @@ var _ = Describe("Room Handler", func() {
"status": status,
})
mockPipeline.EXPECT().ZAdd(pKey, gomock.Any())
mockPipeline.EXPECT().ZRem(lKey, roomName)
mockPipeline.EXPECT().SAdd(sKey, rKey)
for _, key := range allStatusKeys {
mockPipeline.EXPECT().SRem(key, rKey)
Expand All @@ -179,6 +183,8 @@ var _ = Describe("Room Handler", func() {
url := "/scheduler/schedulerName/rooms/roomName/status"
rKey := "scheduler:schedulerName:rooms:roomName"
pKey := "scheduler:schedulerName:ping"
lKey := "scheduler:schedulerName:last:status:occupied"
roomName := "roomName"
status := "ready"
newSKey := fmt.Sprintf("scheduler:schedulerName:status:%s", status)
allStatusKeys := []string{
Expand All @@ -202,6 +208,7 @@ var _ = Describe("Room Handler", func() {
"status": status,
})
mockPipeline.EXPECT().ZAdd(pKey, gomock.Any())
mockPipeline.EXPECT().ZRem(lKey, roomName)
mockPipeline.EXPECT().SAdd(newSKey, rKey)
for _, key := range allStatusKeys {
mockPipeline.EXPECT().SRem(key, rKey)
Expand Down Expand Up @@ -298,6 +305,7 @@ var _ = Describe("Room Handler", func() {
"status": status,
})
mockPipeline.EXPECT().ZAdd(pKey, gomock.Any())
mockPipeline.EXPECT().ZRem(lKey, roomName)
for _, key := range allStatusKeys {
mockPipeline.EXPECT().SRem(key, rKey)
}
Expand Down
4 changes: 3 additions & 1 deletion api/scheduler_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,10 @@ var _ = Describe("Scheduler Handler", func() {
for _, status := range allStatus {
mockPipeline.EXPECT().
SRem(models.GetRoomStatusSetRedisKey(room.SchedulerName, status), room.GetRoomRedisKey())
mockPipeline.EXPECT().
ZRem(models.GetLastStatusRedisKey(room.SchedulerName, status), room.ID)
}
mockPipeline.EXPECT().ZRem(models.GetRoomPingRedisKey(svc.GetNamespace()), room.ID)
mockPipeline.EXPECT().ZRem(models.GetRoomPingRedisKey(room.SchedulerName), room.ID)
mockPipeline.EXPECT().Del(room.GetRoomRedisKey())
mockPipeline.EXPECT().Exec()
}
Expand Down
1 change: 1 addition & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ scaleUpTimeoutSeconds: 300
scaleDownTimeoutSeconds: 300
deleteTimeoutSeconds: 600
pingTimeout: 60
occupiedTimeout: 180
updateTimeoutSeconds: 300
sentry:
url: ""
Expand Down
1 change: 1 addition & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ scaleUpTimeoutSeconds: 300
scaleDownTimeoutSeconds: 300
deleteTimeoutSeconds: 150
pingTimeout: 300
occupiedTimeout: 180
updateTimeoutSeconds: 600
newrelic:
app: maestro
Expand Down
45 changes: 41 additions & 4 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func DeleteRoomsNoPingSince(logger logrus.FieldLogger, mr *models.MixedMetricsRe
return err
})
if err != nil {
logger.WithError(err).Error("error listing rooms")
logger.WithError(err).Error("error listing rooms no ping since")
return err
}
if len(roomNames) == 0 {
Expand All @@ -213,7 +213,7 @@ func DeleteRoomsNoPingSince(logger logrus.FieldLogger, mr *models.MixedMetricsRe
for _, roomName := range roomNames {
err := deleteServiceAndPod(logger, mr, clientset, schedulerName, roomName)
if err != nil && !strings.Contains(err.Error(), "not found") {
logger.WithField("roomName", roomName).WithError(err).Error("error deleting room")
logger.WithFields(logrus.Fields{"roomName": roomName, "function": "DeleteRoomsNoPingSince"}).WithError(err).Error("error deleting room")
} else {
room := models.NewRoom(roomName, schedulerName)
err = room.ClearAll(redisClient)
Expand All @@ -226,6 +226,43 @@ func DeleteRoomsNoPingSince(logger logrus.FieldLogger, mr *models.MixedMetricsRe
return nil
}

// DeleteRoomsOccupiedTimeout delete rooms that have occupied status for more than timeout time
func DeleteRoomsOccupiedTimeout(
logger logrus.FieldLogger,
mr *models.MixedMetricsReporter,
redisClient redisinterfaces.RedisClient,
clientset kubernetes.Interface,
schedulerName string,
since int64,
) error {
var roomNames []string
var err error
err = mr.WithSegment(models.SegmentZRangeBy, func() error {
roomNames, err = models.GetRoomsOccupiedTimeout(redisClient, schedulerName, since)
return err
})
if err != nil {
logger.WithError(err).Error("error listing rooms occupied timeout")
return err
}
if len(roomNames) == 0 {
return nil
}
for _, roomName := range roomNames {
err := deleteServiceAndPod(logger, mr, clientset, schedulerName, roomName)
if err != nil && !strings.Contains(err.Error(), "not found") {
logger.WithFields(logrus.Fields{"roomName": roomName, "function": "DeleteRoomsOccupiedTimeout"}).WithError(err).Error("error deleting room")
} else {
room := models.NewRoom(roomName, schedulerName)
err = room.ClearAll(redisClient)
if err != nil {
logger.WithFields(logrus.Fields{"roomName": roomName, "function": "DeleteRoomsOccupiedTimeout"}).WithError(err).Error("error removing room info from redis")
}
}
}
return nil
}

// ScaleUp scales up a scheduler using its config
func ScaleUp(logger logrus.FieldLogger, mr *models.MixedMetricsReporter, db pginterfaces.DB, redisClient redisinterfaces.RedisClient, clientset kubernetes.Interface, scheduler *models.Scheduler, amount, timeoutSec int, initalOp bool) error {
l := logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -477,13 +514,13 @@ func UpdateSchedulerConfig(
err = errors.New("timeout during new room creation")
return maestroErrors.NewKubernetesError("error when creating new rooms. Maestro will scale up, if necessary, with previous room configuration.", err)
case <-ticker.C:
svcs, err := clientset.CoreV1().Services(schedulerName).List(listOptions)
k8sPods, err := clientset.CoreV1().Pods(schedulerName).List(listOptions)
if err != nil {
logger.WithError(err).Error("error when getting services")
continue
}

numberCurrentPods := len(svcs.Items)
numberCurrentPods := len(k8sPods.Items)
numberPodsToCreate := numberOldPods - numberCurrentPods

for i := 0; i < numberPodsToCreate; i++ {
Expand Down
Loading

0 comments on commit 306e3f2

Please sign in to comment.