From 94cd176b745f67e2a0b9fbf0576a1607223549e7 Mon Sep 17 00:00:00 2001 From: lftakakura Date: Fri, 9 Aug 2019 12:01:15 -0300 Subject: [PATCH] Feature/rolling update (#66) * Create rooms before deleting old ones * Remove unnecessary comments * Fix rolling-update flow * Create pod and delete old one after in each chunk * Remove waitCreatingAndDeleteOldPods * Log error if not able to get pod in waitTerminatingPods and waitCreatingPods * WIP: Use goroutines to perform rolling update * Use goroutines to perform rolling update --- api/scheduler_handler_test.go | 60 ++--- controller/controller.go | 2 +- controller/controller_test.go | 438 ++++++---------------------------- controller/utils.go | 248 ++++++++++--------- testing/common.go | 209 ++++++++++++++-- 5 files changed, 432 insertions(+), 525 deletions(-) diff --git a/api/scheduler_handler_test.go b/api/scheduler_handler_test.go index e54423b7a..862145dd4 100644 --- a/api/scheduler_handler_test.go +++ b/api/scheduler_handler_test.go @@ -76,7 +76,7 @@ var _ = Describe("Scheduler Handler", func() { }, "shutdownTimeout": 180, "autoscaling": { - "min": 100, + "min": 10, "up": { "delta": 10, "trigger": { @@ -293,7 +293,7 @@ var _ = Describe("Scheduler Handler", func() { app.Router.ServeHTTP(recorder, request) Expect(recorder.Code).To(Equal(http.StatusOK)) - Expect(recorder.Body.String()).To(Equal(`[{"autoscalingDownTriggerUsage":50,"autoscalingMin":100,"autoscalingUpTriggerUsage":70,"game":"game-name","name":"scheduler1","roomsCreating":2,"roomsOccupied":1,"roomsReady":1,"roomsTerminating":0,"state":"in-sync"}]`)) + Expect(recorder.Body.String()).To(Equal(`[{"autoscalingDownTriggerUsage":50,"autoscalingMin":10,"autoscalingUpTriggerUsage":70,"game":"game-name","name":"scheduler1","roomsCreating":2,"roomsOccupied":1,"roomsReady":1,"roomsTerminating":0,"state":"in-sync"}]`)) }) It("should list empty array when there aren't schedulers", func() { @@ -318,23 +318,23 @@ var _ = Describe("Scheduler Handler", func() { Context("when all services are healthy", func() { It("returns a status code of 201 and success body", func() { mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient) - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(100) + mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(10) mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( func(schedulerName string, statusInfo map[string]interface{}) { Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) }, - ).Times(100) - mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(100) - mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(100) - mockPipeline.EXPECT().Exec().Times(100) + ).Times(10) + mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(10) + mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(10) + mockPipeline.EXPECT().Exec().Times(10) MockInsertScheduler(mockDb, nil) MockUpdateScheduler(mockDb, nil, nil) mockRedisClient.EXPECT(). Get(models.GlobalPortsPoolKey). Return(goredis.NewStringResult(workerPortRange, nil)). - Times(100) + Times(10) var configYaml1 models.ConfigYAML err := yaml.Unmarshal([]byte(yamlString), &configYaml1) @@ -564,16 +564,16 @@ autoscaling: It("forwards scheduler event", func() { mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient) - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(100) + mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(10) mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( func(schedulerName string, statusInfo map[string]interface{}) { Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) }, - ).Times(100) - mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(100) - mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(100) - mockPipeline.EXPECT().Exec().Times(100) + ).Times(10) + mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(10) + mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(10) + mockPipeline.EXPECT().Exec().Times(10) mockDb.EXPECT().Query( gomock.Any(), @@ -589,7 +589,7 @@ autoscaling: MockInsertScheduler(mockDb, nil) MockUpdateScheduler(mockDb, nil, nil) mockRedisClient.EXPECT().Get(models.GlobalPortsPoolKey). - Return(goredis.NewStringResult(workerPortRange, nil)).Times(100) + Return(goredis.NewStringResult(workerPortRange, nil)).Times(10) var configYaml1 models.ConfigYAML err := yaml.Unmarshal([]byte(yamlString), &configYaml1) @@ -699,7 +699,7 @@ autoscaling: }, "shutdownTimeout": 180, "autoscaling": { - "min": 100, + "min": 10, "up": { "delta": 10, "trigger": { @@ -779,8 +779,8 @@ autoscaling: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -1111,8 +1111,8 @@ autoscaling: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -1202,8 +1202,8 @@ autoscaling: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, errors.New("err on db")) @@ -2381,8 +2381,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2439,8 +2439,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2780,8 +2780,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2873,7 +2873,7 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2954,8 +2954,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) diff --git a/controller/controller.go b/controller/controller.go index 3ca89e7a1..c7fcfbaa4 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -693,7 +693,7 @@ waitForLock: deletedPods := []v1.Pod{} for i, chunk := range podChunks { - l.Debugf("deleting chunk %d: %v", i, names(chunk)) + l.Debugf("updating chunk %d: %v", i, names(chunk)) newlyCreatedPods, newlyDeletedPods, timedout, canceled := replacePodsAndWait( l, roomManager, mr, clientset, db, redisClient.Client, diff --git a/controller/controller_test.go b/controller/controller_test.go index c4cb19104..a155658f2 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -353,7 +353,7 @@ var _ = Describe("Controller", func() { mt.MockInsertScheduler(mockDb, nil) mt.MockUpdateScheduler(mockDb, nil, nil) - mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -440,7 +440,7 @@ cmd: mt.MockInsertScheduler(mockDb, nil) mt.MockUpdateScheduler(mockDb, nil, nil) - mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -545,7 +545,7 @@ portRange: schedulerPortStart := configYaml1.PortRange.Start schedulerPortEnd := configYaml1.PortRange.End mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, - workerPortRange, schedulerPortStart, schedulerPortEnd) + workerPortRange, schedulerPortStart, schedulerPortEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -1723,7 +1723,7 @@ portRange: schedulerPortStart := configYaml.PortRange.Start schedulerPortEnd := configYaml.PortRange.End mt.MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, - workerPortRange, schedulerPortStart, schedulerPortEnd) + workerPortRange, schedulerPortStart, schedulerPortEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -3610,10 +3610,10 @@ cmd: // Remove old rooms mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml2) - // Create new roome + // Create new rooms // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2) - mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 0) + mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -3744,13 +3744,13 @@ portRange: Return(goredis.NewStringResult(workerPortRange, nil)) // Remove old rooms - mt.MockRemoveRoomStatusFromRedis(mockRedisClient, mockPipeline, pods, &configYaml2) + mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml2) // Create new rooms // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale mt.MockCreateRoomsWithPorts(mockRedisClient, mockPipeline, &configYaml2) mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, configYaml2.PortRange.Start, configYaml2.PortRange.End) + workerPortRange, configYaml2.PortRange.Start, configYaml2.PortRange.End, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -3917,9 +3917,9 @@ cmd: mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1) // Create new rooms - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 0) mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd) + workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -4089,12 +4089,12 @@ portRange: Return(goredis.NewStringResult(workerPortRange, nil)) // Remove old rooms - mt.MockRemoveRoomStatusFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1) + mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1) // Create new rooms mt.MockCreateRoomsWithPorts(mockRedisClient, mockPipeline, &configYaml2) mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, 20000, 20020) + workerPortRange, 20000, 20020, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -4399,11 +4399,7 @@ cmd: Expect(err.Error()).To(Equal("error getting lock")) }) - It("should return error if timeout when deleting rooms", func() { - pods, err := clientset.CoreV1().Pods("controller-name").List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(3)) - + It("should return error if timeout when creating rooms", func() { // Update scheduler calls := mt.NewCalls() @@ -4423,21 +4419,19 @@ cmd: calls.Append( mt.MockSelectScheduler(yaml1, mockDb, nil)) - // Delete first room - calls.Append( - mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1)) + // Create rooms + mt.MockCreateRoomsAnyTimes(mockRedisClient, mockPipeline, &configYaml1, 0) + mt.MockGetPortsFromPoolAnyTimes(&configYaml1, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd) - // Get timeout for waiting pod to be deleted and create new one - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(int64(timeoutSec+100), 0))) + // Get timeout for waiting pod to be created + mockClock.EXPECT(). + Now(). + Return(time.Unix(int64(timeoutSec+100), 0)).Times(3).Do(func() { + }) - // Timeod out, so rollback - calls.Append( - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1)) - calls.Append(mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd)) + // Delete newly created rooms + mt.MockRemoveAnyRoomsFromRedisAnyTimes(mockRedisClient, mockPipeline, &configYaml1, nil) calls.Append( mt.MockReturnRedisLock(mockRedisClient, lockKey, nil)) @@ -4463,158 +4457,47 @@ cmd: Expect(err.Error()).To(Equal("timedout waiting rooms to be replaced, rolled back")) }) - It("should return error if timeout when creating rooms", func() { - pods, err := clientset.CoreV1().Pods("controller-name").List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(3)) - + It("should return error if timeout when deleting rooms", func() { // Update scheduler calls := mt.NewCalls() - // Get lock - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(0, 0))) - calls.Add( - mockRedisClient.EXPECT(). - SetNX(lockKey, gomock.Any(), time.Duration(lockTimeoutMs)*time.Millisecond). - Return(goredis.NewBoolResult(true, nil))) + calls.Append( + mt.MockRedisLock(mockRedisClient, lockKey, lockTimeoutMs, true, nil)) // Set new operation manager description - mt.MockSetDescription(opManager, mockRedisClient, "running", nil) + calls.Append( + mt.MockSetDescription(opManager, mockRedisClient, "running", nil)) // Get scheduler from DB - calls.Add( - mockDb.EXPECT(). - Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", configYaml2.Name). - Do(func(scheduler *models.Scheduler, query string, modifier string) { - *scheduler = *models.NewScheduler(configYaml1.Name, configYaml1.Game, yaml1) - })) - - // Delete first room - for _, pod := range pods.Items { - calls.Add( - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) + calls.Append( + mt.MockSelectScheduler(yaml1, mockDb, nil)) - room := models.NewRoom(pod.GetName(), pod.GetNamespace()) + // Create room + mt.MockCreateRoomsAnyTimes(mockRedisClient, mockPipeline, &configYaml1, 0) + mt.MockGetPortsFromPoolAnyTimes(&configYaml1, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd) - for _, status := range allStatus { - calls.Add( - mockPipeline.EXPECT(). - SRem(models.GetRoomStatusSetRedisKey(room.SchedulerName, status), room.GetRoomRedisKey())) - mockPipeline.EXPECT(). - ZRem(models.GetLastStatusRedisKey(room.SchedulerName, status), room.ID) - } - calls.Add(mockPipeline.EXPECT().ZRem(models.GetRoomPingRedisKey(pod.GetNamespace()), room.ID)) - for _, mt := range allMetrics { - calls.Add( - mockPipeline.EXPECT(). - ZRem(models.GetRoomMetricsRedisKey(room.SchedulerName, mt), gomock.Any())) - } - calls.Add(mockPipeline.EXPECT().Del(room.GetRoomRedisKey())) - calls.Add(mockPipeline.EXPECT().Exec()) - break - } - - // Get timeout for waiting pod to be deleted and create new one - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(int64(timeoutSec-100), 0))) - - // Create new room with updated scheduler - for range pods.Items { - calls.Add(mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) - calls.Add( - mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( - func(schedulerName string, statusInfo map[string]interface{}) { - Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) - Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) - }, - )) - calls.Add( - mockPipeline.EXPECT(). - SAdd(models.GetRoomStatusSetRedisKey(configYaml1.Name, "creating"), gomock.Any())) - calls.Add( - mockPipeline.EXPECT(). - ZAdd(models.GetRoomPingRedisKey(configYaml1.Name), gomock.Any())) - calls.Add( - mockPipeline.EXPECT().Exec()) - calls.Add( - mockRedisClient.EXPECT(). - Get(models.GlobalPortsPoolKey). - Return(goredis.NewStringResult(workerPortRange, nil))) - calls.Add( - mockPortChooser.EXPECT(). - Choose(portStart, portEnd, 2). - Return([]int{5000, 5001})) - break - } + // Get lock + mockClock.EXPECT(). + Now(). + Return(time.Unix(0, 0)).Times(1) - // Get timeout for waiting new pod to be created - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(int64(timeoutSec+100), 0))) + // Mock expired time now + mockClock.EXPECT(). + Now(). + Return(time.Unix(int64(timeoutSec+100), 0)).Times(3) - // Timed out, so rollback - // Delete newly created room - for range pods.Items { - calls.Add( - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) + // Mock not expired time now + mockClock.EXPECT(). + Now(). + Return(time.Unix(int64(timeoutSec-100), 0)).Times(3) - for range allStatus { - calls.Add( - mockPipeline.EXPECT(). - SRem(gomock.Any(), gomock.Any())) - calls.Add(mockPipeline.EXPECT(). - ZRem(gomock.Any(), gomock.Any())) - } - for range allMetrics { - calls.Add( - mockPipeline.EXPECT(). - ZRem(gomock.Any(), gomock.Any())) - } - calls.Add(mockPipeline.EXPECT().ZRem(gomock.Any(), gomock.Any())) - calls.Add(mockPipeline.EXPECT().Del(gomock.Any())) - calls.Add(mockPipeline.EXPECT().Exec()) - break - } + // Delete old rooms + mt.MockRemoveAnyRoomsFromRedisAnyTimes(mockRedisClient, mockPipeline, &configYaml1, nil) - // Create new room to replace the one with old version deleted - for range pods.Items { - calls.Add(mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) - calls.Add( - mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( - func(schedulerName string, statusInfo map[string]interface{}) { - Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) - Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) - }, - )) - calls.Add( - mockPipeline.EXPECT(). - SAdd(models.GetRoomStatusSetRedisKey(configYaml1.Name, "creating"), gomock.Any())) - calls.Add( - mockPipeline.EXPECT(). - ZAdd(models.GetRoomPingRedisKey(configYaml1.Name), gomock.Any())) - calls.Add( - mockPipeline.EXPECT().Exec()) - calls.Add( - mockRedisClient.EXPECT(). - Get(models.GlobalPortsPoolKey). - Return(goredis.NewStringResult(workerPortRange, nil))) - calls.Add( - mockPortChooser.EXPECT(). - Choose(portStart, portEnd, 2). - Return([]int{5000, 5001})) - break - } + calls.Append( + mt.MockReturnRedisLock(mockRedisClient, lockKey, nil)) - calls.Add( - mockRedisClient.EXPECT(). - Eval(gomock.Any(), []string{lockKey}, gomock.Any()). - Return(goredis.NewCmdResult(nil, nil))) calls.Finish() err = controller.UpdateSchedulerConfig( @@ -4626,7 +4509,7 @@ cmd: redisClient, clientset, &configYaml2, - 10, + maxSurge, mockClock, nil, config, @@ -4659,54 +4542,18 @@ cmd: // Get scheduler from DB calls.Append(mt.MockSelectScheduler(yaml1, mockDb, nil)) - // Delete rooms - errRedis := errors.New("redis error") - for _, pod := range pods.Items { - // Retrieve ports to pool - room := models.NewRoom(pod.GetName(), pod.GetNamespace()) - calls.Add( - mockRedisClient.EXPECT(). - TxPipeline(). - Return(mockPipeline)) - for _, status := range allStatus { - calls.Add( - mockPipeline.EXPECT(). - SRem(models.GetRoomStatusSetRedisKey(room.SchedulerName, status), room.GetRoomRedisKey())) - calls.Add( - mockPipeline.EXPECT(). - ZRem(models.GetLastStatusRedisKey(room.SchedulerName, status), room.ID)) - } - calls.Add( - mockPipeline.EXPECT(). - ZRem(models.GetRoomPingRedisKey(pod.GetNamespace()), room.ID)) - for _, mt := range allMetrics { - mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(room.SchedulerName, mt), gomock.Any()) - } - calls.Add( - mockPipeline.EXPECT(). - Del(room.GetRoomRedisKey())) - calls.Add( - mockPipeline.EXPECT(). - Exec(). - Return(nil, errRedis)) - } - - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(0, 0))) + // Create room + mt.MockCreateRoomsAnyTimes(mockRedisClient, mockPipeline, &configYaml1, 3) + mt.MockGetPortsFromPoolAnyTimes(&configYaml1, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd) - // Create new pods - calls.Append( - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2)) - calls.Append( - mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd)) + // Delete old rooms + mt.MockRemoveAnyRoomsFromRedisAnyTimes(mockRedisClient, mockPipeline, &configYaml1, errors.New("redis error")) - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(0, 0))) + // Mock not expired time now + mockClock.EXPECT(). + Now(). + Return(time.Unix(int64(timeoutSec-100), 0)).AnyTimes() calls.Append( mt.MockUpdateSchedulersTable(mockDb, nil)) @@ -4787,8 +4634,8 @@ cmd: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2) - mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 0) + mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -4905,130 +4752,6 @@ cmd: } }) - It("should stop on createPodsAsTheyAreDeleted", func() { - yamlString := ` -name: scheduler-name-cancel -autoscaling: - min: 3 - up: - trigger: - limit: 10 -containers: -- name: container1 - image: image1 -` - newYamlString := ` -name: scheduler-name-cancel -autoscaling: - min: 3 - up: - trigger: - limit: 10 -containers: -- name: container1 - image: image2 -` - configYaml, _ := models.NewConfigYAML(yamlString) - newConfigYaml, err := models.NewConfigYAML(newYamlString) - Expect(err).ToNot(HaveOccurred()) - - mt.MockCreateScheduler(clientset, mockRedisClient, mockPipeline, mockDb, - logger, roomManager, mr, yamlString, timeoutSec, mockPortChooser, workerPortRange, portStart, portEnd) - - pods, err := clientset.CoreV1().Pods("scheduler-name-cancel").List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(3)) - - for _, pod := range pods.Items { - Expect(pod.ObjectMeta.Labels["version"]).To(Equal("v1.0")) - } - - // Select current scheduler yaml - mt.MockSelectScheduler(newYamlString, mockDb, nil) - - // Get redis lock - lockKey := "maestro-lock-key-scheduler-name-cancel" - mt.MockRedisLock(mockRedisClient, lockKey, lockTimeoutMs, true, nil) - - // Set new operation manager description - mt.MockSetDescription(opManager, mockRedisClient, "running", nil) - - // Remove old room - for i, pod := range pods.Items { - room := models.NewRoom(pod.GetName(), pod.GetNamespace()) - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline) - - for _, status := range allStatus { - mockPipeline.EXPECT(). - SRem( - models.GetRoomStatusSetRedisKey(room.SchedulerName, status), - room.GetRoomRedisKey()) - mockPipeline.EXPECT().ZRem( - models.GetLastStatusRedisKey(room.SchedulerName, status), room.ID) - } - for _, mt := range allMetrics { - mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(room.SchedulerName, mt), gomock.Any()) - } - mockPipeline.EXPECT(). - ZRem(models.GetRoomPingRedisKey(pod.GetNamespace()), room.ID) - - if i == len(pods.Items)-1 { - mockPipeline.EXPECT().Del(room.GetRoomRedisKey()).Do(func(_ string) { - opManager.Cancel(opManager.GetOperationKey()) - }) - } else { - mockPipeline.EXPECT().Del(room.GetRoomRedisKey()) - } - - mockPipeline.EXPECT().Exec() - } - - // Delete keys from OperationManager (to cancel it) - mt.MockDeleteRedisKey(opManager, mockRedisClient, mockPipeline, nil) - - // Create rooms to rollback - mt.MockCreateRooms(mockRedisClient, mockPipeline, newConfigYaml) - mt.MockGetPortsFromPool(newConfigYaml, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd) - - // Retrieve redis lock - mt.MockReturnRedisLock(mockRedisClient, lockKey, nil) - - err = controller.UpdateSchedulerConfig( - context.Background(), - logger, - roomManager, - mr, - mockDb, - redisClient, - clientset, - configYaml, - maxSurge, - &clock.Clock{}, - nil, - config, - opManager, - ) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("operation was canceled, rolled back")) - - pods, err = clientset.CoreV1().Pods(configYaml.Name).List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(configYaml.AutoScaling.Min)) - - for _, pod := range pods.Items { - Expect(pod.GetName()).To(ContainSubstring("scheduler-name-cancel-")) - Expect(pod.GetName()).To(HaveLen(len("scheduler-name-cancel-") + 8)) - Expect(pod.Spec.Containers[0].Env[0].Name).To(Equal("MAESTRO_SCHEDULER_NAME")) - Expect(pod.Spec.Containers[0].Env[0].Value).To(Equal("scheduler-name-cancel")) - Expect(pod.Spec.Containers[0].Env[1].Name).To(Equal("MAESTRO_ROOM_ID")) - Expect(pod.Spec.Containers[0].Env[1].Value).To(Equal(pod.GetName())) - Expect(pod.Spec.Containers[0].Env).To(HaveLen(2)) - Expect(pod.ObjectMeta.Labels["heritage"]).To(Equal("maestro")) - Expect(pod.ObjectMeta.Labels["version"]).To(Equal("v1.0")) - } - }) - It("should stop on waitCreatingPods", func() { yamlString := ` name: scheduler-name-cancel @@ -5053,7 +4776,6 @@ containers: image: image2 ` configYaml, _ := models.NewConfigYAML(yamlString) - newConfigYaml, _ := models.NewConfigYAML(newYamlString) mt.MockCreateScheduler(clientset, mockRedisClient, mockPipeline, mockDb, logger, roomManager, mr, yamlString, timeoutSec, mockPortChooser, workerPortRange, portStart, portEnd) @@ -5079,9 +4801,6 @@ containers: // Delete keys from OperationManager (to cancel it) mt.MockDeleteRedisKey(opManager, mockRedisClient, mockPipeline, nil) - // Create rooms to rollback - mt.MockCreateRooms(mockRedisClient, mockPipeline, newConfigYaml) - // But first, create rooms for i := 0; i < configYaml.AutoScaling.Min; i++ { mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline) @@ -5120,25 +4839,6 @@ containers: mockPipeline.EXPECT().Exec() } - // But first, remove old rooms - for i := 0; i < configYaml2.AutoScaling.Min; i++ { - mockRedisClient.EXPECT().TxPipeline(). - Return(mockPipeline) - for _, status := range allStatus { - mockPipeline.EXPECT().SRem( - models.GetRoomStatusSetRedisKey(configYaml.Name, status), gomock.Any()) - mockPipeline.EXPECT().ZRem( - models.GetLastStatusRedisKey(configYaml.Name, status), gomock.Any()) - } - for _, mt := range allMetrics { - mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(configYaml.Name, mt), gomock.Any()) - } - mockPipeline.EXPECT().ZRem( - models.GetRoomPingRedisKey(configYaml.Name), gomock.Any()) - mockPipeline.EXPECT().Del(gomock.Any()) - mockPipeline.EXPECT().Exec() - } - // Retrieve redis lock mt.MockReturnRedisLock(mockRedisClient, lockKey, nil) @@ -5334,8 +5034,8 @@ containers: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1) - mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 0) + mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -5561,8 +5261,8 @@ containers: mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml) // Create new rooms - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - mt.MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + mt.MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) diff --git a/controller/utils.go b/controller/utils.go index 17fc5d8a5..b9842b051 100644 --- a/controller/utils.go +++ b/controller/utils.go @@ -12,6 +12,7 @@ import ( "fmt" "math" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -39,44 +40,125 @@ func replacePodsAndWait( willTimeoutAt time.Time, clock clockinterfaces.Clock, configYAML *models.ConfigYAML, - podsToDelete []v1.Pod, + podsChunk []v1.Pod, scheduler *models.Scheduler, operationManager *models.OperationManager, ) (createdPods []v1.Pod, deletedPods []v1.Pod, timedout, canceled bool) { + timedout = false + canceled = false createdPods = []v1.Pod{} deletedPods = []v1.Pod{} + var wg sync.WaitGroup + var mutex = &sync.Mutex{} + + // create a chunk of pods (chunkSize = maxSurge) and remove a chunk of old ones + wg.Add(len(podsChunk)) + for _, pod := range podsChunk { + go func(pod v1.Pod) { + defer wg.Done() + localTimedout, localCanceled := createNewRemoveOldPod( + logger, + roomManager, + mr, + clientset, + db, + redisClient, + willTimeoutAt, + clock, + configYAML, + scheduler, + operationManager, + mutex, + pod, + &createdPods, + &deletedPods, + ) + // if a routine is timedout or canceled, + // rolling update should stop + if localTimedout { + mutex.Lock() + timedout = localTimedout + mutex.Unlock() + } + if localCanceled { + mutex.Lock() + canceled = localCanceled + mutex.Unlock() + } + }(pod) + } + wg.Wait() - for _, pod := range podsToDelete { - logger.Debugf("deleting pod %s", pod.GetName()) + return createdPods, deletedPods, timedout, canceled +} - err := DeletePodAndRoom(logger, roomManager, mr, clientset, redisClient, - configYAML, pod.GetName(), reportersConstants.ReasonUpdate) - if err == nil || strings.Contains(err.Error(), "redis") { - deletedPods = append(deletedPods, pod) - } - if err != nil { - logger.WithError(err).Debugf("error deleting pod %s", pod.GetName()) - } +func createNewRemoveOldPod( + logger logrus.FieldLogger, + roomManager models.RoomManager, + mr *models.MixedMetricsReporter, + clientset kubernetes.Interface, + db pginterfaces.DB, + redisClient redisinterfaces.RedisClient, + willTimeoutAt time.Time, + clock clockinterfaces.Clock, + configYAML *models.ConfigYAML, + scheduler *models.Scheduler, + operationManager *models.OperationManager, + mutex *sync.Mutex, + pod v1.Pod, + createdPods *[]v1.Pod, + deletedPods *[]v1.Pod, +) (timedout, canceled bool) { + logger.Debug("creating pod") + + // create new pod + newPod, err := roomManager.Create(logger, mr, redisClient, + db, clientset, configYAML, scheduler) + + if err != nil { + logger.WithError(err).Debug("error creating pod") + return false, false } - now := clock.Now() - timeout := willTimeoutAt.Sub(now) - createdPods, timedout, canceled = createPodsAsTheyAreDeleted( - logger, roomManager, mr, clientset, db, redisClient, timeout, configYAML, - deletedPods, scheduler, operationManager) + mutex.Lock() + *createdPods = append(*createdPods, *newPod) + mutex.Unlock() + + // wait for new pod to be created + timeout := willTimeoutAt.Sub(clock.Now()) + timedout, canceled = waitCreatingPods( + logger, clientset, timeout, configYAML.Name, + []v1.Pod{*newPod}, operationManager, mr) if timedout || canceled { - return createdPods, deletedPods, timedout, canceled + return timedout, canceled + } + + // delete old pod + logger.Debugf("deleting pod %s", pod.GetName()) + err = DeletePodAndRoom(logger, roomManager, mr, clientset, redisClient, + configYAML, pod.GetName(), reportersConstants.ReasonUpdate) + if err == nil || strings.Contains(err.Error(), "redis") { + mutex.Lock() + *deletedPods = append(*deletedPods, pod) + mutex.Unlock() + } + if err != nil { + logger.WithError(err).Debugf("error deleting pod %s", pod.GetName()) + return false, false } + // wait for old pods to be deleted + // we assume that maxSurge == maxUnavailable as we can't set maxUnavailable yet + // so for every pod created in a chunk one is deleted right after it timeout = willTimeoutAt.Sub(clock.Now()) - timedout, canceled = waitCreatingPods( + timedout, canceled = waitTerminatingPods( logger, clientset, timeout, configYAML.Name, - createdPods, operationManager, mr) + []v1.Pod{pod}, operationManager, mr) if timedout || canceled { - return createdPods, deletedPods, timedout, canceled + return timedout, canceled } - return createdPods, deletedPods, false, false + return false, false } // In rollback, it must delete newly created pod and @@ -136,13 +218,12 @@ func rollback( } waitTimeout := willTimeoutAt.Sub(time.Now()) - err = waitTerminatingPods( + timedout, _ := waitTerminatingPods( logger, clientset, waitTimeout, configYAML.Name, - createdPodChunks[i], - mr, + createdPodChunks[i], nil, mr, ) - if err != nil { - return err + if timedout { + return errors.New("timeout waiting for rooms to be removed") } } @@ -174,92 +255,15 @@ func rollback( return nil } -func createPodsAsTheyAreDeleted( - l logrus.FieldLogger, - roomManager models.RoomManager, - mr *models.MixedMetricsReporter, - clientset kubernetes.Interface, - db pginterfaces.DB, - redisClient redisinterfaces.RedisClient, - timeout time.Duration, - configYAML *models.ConfigYAML, - deletedPods []v1.Pod, - scheduler *models.Scheduler, - operationManager *models.OperationManager, -) (createdPods []v1.Pod, timedout, wasCanceled bool) { - logger := l.WithFields(logrus.Fields{ - "operation": "controller.waitTerminatingPods", - "scheduler": configYAML.Name, - }) - - createdPods = []v1.Pod{} - logger.Debugf("pods to terminate: %#v", names(deletedPods)) - - timeoutTimer := time.NewTimer(timeout) - defer timeoutTimer.Stop() - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - i := 0 - for { - exit := true - select { - case <-ticker.C: - if operationManager.WasCanceled() { - logger.Warn("operation was canceled") - return createdPods, false, true - } - - for j := i; j < len(deletedPods); j++ { - pod := deletedPods[i] - err := mr.WithSegment(models.SegmentPod, func() error { - var err error - _, err = clientset.CoreV1().Pods(configYAML.Name).Get(pod.GetName(), getOptions) - return err - }) - if err == nil || !strings.Contains(err.Error(), "not found") { - logger.WithField("pod", pod.GetName()).Debugf("pod still exists") - exit = false - break - } - - newPod, err := roomManager.Create(logger, mr, redisClient, - db, clientset, configYAML, scheduler) - if err != nil { - exit = false - logger. - WithError(err). - Info("error creating pod") - break - } - - i = j + 1 - - createdPods = append(createdPods, *newPod) - } - case <-timeoutTimer.C: - err := errors.New("timeout waiting for rooms to be removed") - logger.WithError(err).Error("stopping scale") - return createdPods, true, false - } - - if exit { - logger.Info("terminating pods were successfully removed") - break - } - } - - return createdPods, false, false -} - func waitTerminatingPods( l logrus.FieldLogger, clientset kubernetes.Interface, timeout time.Duration, namespace string, deletedPods []v1.Pod, + operationManager *models.OperationManager, mr *models.MixedMetricsReporter, -) error { +) (timedout, wasCanceled bool) { logger := l.WithFields(logrus.Fields{ "operation": "controller.waitTerminatingPods", "scheduler": namespace, @@ -276,6 +280,12 @@ func waitTerminatingPods( exit := true select { case <-ticker.C: + // operationManger is nil when rolling back (rollback can't be canceled) + if operationManager != nil && operationManager.WasCanceled() { + logger.Warn("operation was canceled") + return false, true + } + for _, pod := range deletedPods { err := mr.WithSegment(models.SegmentPod, func() error { var err error @@ -293,11 +303,19 @@ func waitTerminatingPods( exit = false break } + + if err != nil && !strings.Contains(err.Error(), "not found") { + logger. + WithError(err). + WithField("pod", pod.GetName()). + Info("error getting pod") + exit = false + break + } } case <-timeoutTimer.C: - err := errors.New("timeout waiting for rooms to be removed") - logger.WithError(err).Error("stopping scale") - return err + logger.Error("timeout waiting for rooms to be removed") + return true, false } if exit { @@ -306,7 +324,7 @@ func waitTerminatingPods( } } - return nil + return false, false } func waitCreatingPods( @@ -332,7 +350,8 @@ func waitCreatingPods( exit := true select { case <-ticker.C: - if operationManager.WasCanceled() { + // operationManger is nil when rolling back (rollback can't be canceled) + if operationManager != nil && operationManager.WasCanceled() { logger.Warn("operation was canceled") return false, true } @@ -373,6 +392,15 @@ func waitCreatingPods( break } + if err != nil && !strings.Contains(err.Error(), "not found") { + logger. + WithError(err). + WithField("pod", pod.GetName()). + Info("error getting pod") + exit = false + break + } + if !models.IsPodReady(createdPod) { logger.WithField("pod", createdPod.GetName()).Debug("pod not ready yet, waiting...") exit = false diff --git a/testing/common.go b/testing/common.go index 282abb12f..2dfaf8045 100644 --- a/testing/common.go +++ b/testing/common.go @@ -169,6 +169,7 @@ func MockDeleteOldVersions( func mockRemoveRoomsFromRedis( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, + numRooms int, pods *v1.PodList, configYaml *models.ConfigYAML, ) (calls *Calls) { @@ -187,6 +188,38 @@ func mockRemoveRoomsFromRedis( string(models.MemAutoScalingPolicyType), } + for index := 0; index < numRooms; index++ { + calls.Add( + mockRedisClient.EXPECT(). + TxPipeline(). + Return(mockPipeline)) + for _, status := range allStatus { + calls.Add( + mockPipeline.EXPECT(). + SRem(models.GetRoomStatusSetRedisKey(configYaml.Name, status), gomock.Any())) + calls.Add( + mockPipeline.EXPECT(). + ZRem(models.GetLastStatusRedisKey(configYaml.Name, status), gomock.Any())) + } + calls.Add( + mockPipeline.EXPECT(). + ZRem(models.GetRoomPingRedisKey(configYaml.Name), gomock.Any())) + for _, mt := range allMetrics { + calls.Add( + mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(configYaml.Name, mt), gomock.Any())) + } + calls.Add( + mockPipeline.EXPECT(). + Del(gomock.Any())) + calls.Add( + mockPipeline.EXPECT(). + Exec()) + } + + if pods == nil { + return calls + } + for _, pod := range pods.Items { room := models.NewRoom(pod.GetName(), pod.GetNamespace()) calls.Add( @@ -219,36 +252,40 @@ func mockRemoveRoomsFromRedis( return calls } -// MockRemoveRoomStatusFromRedis removes room only from redis -func MockRemoveRoomStatusFromRedis( +// MockRemoveAnyRoomsFromRedis removes any rooms from redis +func MockRemoveAnyRoomsFromRedis( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, - pods *v1.PodList, + numRooms int, configYaml *models.ConfigYAML, ) (calls *Calls) { - return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, - pods, configYaml) + return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, numRooms, nil, configYaml) } -// MockRemoveRoomsFromRedis mocks the room creation from pod +// MockRemoveRoomsFromRedis removes room only from redis func MockRemoveRoomsFromRedis( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, pods *v1.PodList, configYaml *models.ConfigYAML, ) (calls *Calls) { - return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, - pods, configYaml) + return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, 0, pods, configYaml) } func mockCreateRooms( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, configYaml *models.ConfigYAML, + roomCount int, ) (calls *Calls) { calls = NewCalls() - for i := 0; i < configYaml.AutoScaling.Min; i++ { + count := configYaml.AutoScaling.Min + if roomCount > 0 { + count = roomCount + } + + for i := 0; i < count; i++ { calls.Add( mockRedisClient.EXPECT(). TxPipeline(). @@ -280,13 +317,106 @@ func mockCreateRooms( return calls } +// MockRemoveAnyRoomsFromRedisAnyTimes removes any rooms from redis +func MockRemoveAnyRoomsFromRedisAnyTimes( + mockRedisClient *redismocks.MockRedisClient, + mockPipeline *redismocks.MockPipeliner, + configYaml *models.ConfigYAML, redisErrOrNil error, +) { + + allStatus := []string{ + models.StatusCreating, + models.StatusReady, + models.StatusOccupied, + models.StatusTerminating, + models.StatusTerminated, + } + + allMetrics := []string{ + string(models.CPUAutoScalingPolicyType), + string(models.MemAutoScalingPolicyType), + } + + mockRedisClient.EXPECT(). + TxPipeline(). + Return(mockPipeline).AnyTimes() + for _, status := range allStatus { + mockPipeline.EXPECT(). + SRem(models.GetRoomStatusSetRedisKey(configYaml.Name, status), gomock.Any()).AnyTimes() + mockPipeline.EXPECT(). + ZRem(models.GetLastStatusRedisKey(configYaml.Name, status), gomock.Any()).AnyTimes() + } + mockPipeline.EXPECT(). + ZRem(models.GetRoomPingRedisKey(configYaml.Name), gomock.Any()).AnyTimes() + for _, mt := range allMetrics { + mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(configYaml.Name, mt), gomock.Any()).AnyTimes() + } + mockPipeline.EXPECT(). + Del(gomock.Any()).AnyTimes() + + if redisErrOrNil == nil { + mockPipeline.EXPECT(). + Exec().AnyTimes() + } else { + mockPipeline.EXPECT(). + Exec().Return(nil, redisErrOrNil).AnyTimes() + } + +} + +// MockCreateRoomsAnyTimes mocks the creation of rooms on redis +func MockCreateRoomsAnyTimes( + mockRedisClient *redismocks.MockRedisClient, + mockPipeline *redismocks.MockPipeliner, + configYaml *models.ConfigYAML, + times int, +) { + tx := mockRedisClient.EXPECT(). + TxPipeline(). + Return(mockPipeline) + + hmset := mockPipeline.EXPECT(). + HMSet(gomock.Any(), gomock.Any()). + Do(func(schedulerName string, statusInfo map[string]interface{}) { + gomega.Expect(statusInfo["status"]). + To(gomega.Equal(models.StatusCreating)) + gomega.Expect(statusInfo["lastPing"]). + To(gomega.BeNumerically("~", time.Now().Unix(), 1)) + }) + + sadd := mockPipeline.EXPECT(). + SAdd(models.GetRoomStatusSetRedisKey(configYaml.Name, "creating"), + gomock.Any()) + + zadd := mockPipeline.EXPECT(). + ZAdd(models.GetRoomPingRedisKey(configYaml.Name), gomock.Any()) + + exec := mockPipeline.EXPECT().Exec() + + if times > 0 { + tx.Times(times) + hmset.Times(times) + sadd.Times(times) + zadd.Times(times) + exec.Times(times) + + return + } + tx.AnyTimes() + hmset.AnyTimes() + sadd.AnyTimes() + zadd.AnyTimes() + exec.AnyTimes() +} + // MockCreateRooms mocks the creation of rooms on redis func MockCreateRooms( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, configYaml *models.ConfigYAML, + roomCount int, ) (calls *Calls) { - return mockCreateRooms(mockRedisClient, mockPipeline, configYaml) + return mockCreateRooms(mockRedisClient, mockPipeline, configYaml, roomCount) } // MockCreateRoomsWithPorts mocks the creation of rooms on redis when @@ -296,7 +426,7 @@ func MockCreateRoomsWithPorts( mockPipeline *redismocks.MockPipeliner, configYaml *models.ConfigYAML, ) (calls *Calls) { - return mockCreateRooms(mockRedisClient, mockPipeline, configYaml) + return mockCreateRooms(mockRedisClient, mockPipeline, configYaml, 0) } // MockCreateScheduler mocks the creation of a scheduler @@ -355,7 +485,7 @@ func MockCreateScheduler( Times(configYaml.AutoScaling.Min)) calls.Append( - MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd)) + MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0)) calls.Append( MockUpdateScheduler(mockDb, nil, nil)) @@ -373,10 +503,15 @@ func MockGetPortsFromPool( mockRedisClient *redismocks.MockRedisClient, mockPortChooser *mocks.MockPortChooser, workerPortRange string, - portStart, portEnd int, + portStart, portEnd, times int, ) (calls *Calls) { calls = NewCalls() + callTimes := configYaml.AutoScaling.Min + if times > 0 { + callTimes = times + } + if !configYaml.HasPorts() { return } @@ -385,7 +520,7 @@ func MockGetPortsFromPool( mockRedisClient.EXPECT(). Get(models.GlobalPortsPoolKey). Return(goredis.NewStringResult(workerPortRange, nil)). - Times(configYaml.AutoScaling.Min) + Times(callTimes) } if mockPortChooser == nil { @@ -400,7 +535,7 @@ func MockGetPortsFromPool( mockPortChooser.EXPECT(). Choose(portStart, portEnd, nPorts). Return(ports). - Times(configYaml.AutoScaling.Min) + Times(callTimes) } if configYaml.Version() == "v1" { @@ -414,6 +549,50 @@ func MockGetPortsFromPool( return calls } +// MockGetPortsFromPoolAnyTimes mocks the function that chooses random ports +// to be used as HostPort in the pods +func MockGetPortsFromPoolAnyTimes( + configYaml *models.ConfigYAML, + mockRedisClient *redismocks.MockRedisClient, + mockPortChooser *mocks.MockPortChooser, + workerPortRange string, + portStart, portEnd int, +) { + if !configYaml.HasPorts() { + return + } + + if !configYaml.PortRange.IsSet() { + mockRedisClient.EXPECT(). + Get(models.GlobalPortsPoolKey). + Return(goredis.NewStringResult(workerPortRange, nil)). + AnyTimes() + } + + if mockPortChooser == nil { + return + } + + givePorts := func(nPorts int) { + ports := make([]int, nPorts) + for i := 0; i < nPorts; i++ { + ports[i] = portStart + i + } + mockPortChooser.EXPECT(). + Choose(portStart, portEnd, nPorts). + Return(ports). + AnyTimes() + } + + if configYaml.Version() == "v1" { + givePorts(len(configYaml.Ports)) + } else if configYaml.Version() == "v2" { + for _, container := range configYaml.Containers { + givePorts(len(container.Ports)) + } + } +} + // MockInsertScheduler inserts a new scheduler into database func MockInsertScheduler( mockDb *pgmocks.MockDB,