diff --git a/api/scheduler_handler_test.go b/api/scheduler_handler_test.go index e54423b7a..862145dd4 100644 --- a/api/scheduler_handler_test.go +++ b/api/scheduler_handler_test.go @@ -76,7 +76,7 @@ var _ = Describe("Scheduler Handler", func() { }, "shutdownTimeout": 180, "autoscaling": { - "min": 100, + "min": 10, "up": { "delta": 10, "trigger": { @@ -293,7 +293,7 @@ var _ = Describe("Scheduler Handler", func() { app.Router.ServeHTTP(recorder, request) Expect(recorder.Code).To(Equal(http.StatusOK)) - Expect(recorder.Body.String()).To(Equal(`[{"autoscalingDownTriggerUsage":50,"autoscalingMin":100,"autoscalingUpTriggerUsage":70,"game":"game-name","name":"scheduler1","roomsCreating":2,"roomsOccupied":1,"roomsReady":1,"roomsTerminating":0,"state":"in-sync"}]`)) + Expect(recorder.Body.String()).To(Equal(`[{"autoscalingDownTriggerUsage":50,"autoscalingMin":10,"autoscalingUpTriggerUsage":70,"game":"game-name","name":"scheduler1","roomsCreating":2,"roomsOccupied":1,"roomsReady":1,"roomsTerminating":0,"state":"in-sync"}]`)) }) It("should list empty array when there aren't schedulers", func() { @@ -318,23 +318,23 @@ var _ = Describe("Scheduler Handler", func() { Context("when all services are healthy", func() { It("returns a status code of 201 and success body", func() { mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient) - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(100) + mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(10) mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( func(schedulerName string, statusInfo map[string]interface{}) { Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) }, - ).Times(100) - mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(100) - mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(100) - mockPipeline.EXPECT().Exec().Times(100) + ).Times(10) + mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(10) + mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(10) + mockPipeline.EXPECT().Exec().Times(10) MockInsertScheduler(mockDb, nil) MockUpdateScheduler(mockDb, nil, nil) mockRedisClient.EXPECT(). Get(models.GlobalPortsPoolKey). Return(goredis.NewStringResult(workerPortRange, nil)). - Times(100) + Times(10) var configYaml1 models.ConfigYAML err := yaml.Unmarshal([]byte(yamlString), &configYaml1) @@ -564,16 +564,16 @@ autoscaling: It("forwards scheduler event", func() { mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient) - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(100) + mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(10) mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( func(schedulerName string, statusInfo map[string]interface{}) { Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) }, - ).Times(100) - mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(100) - mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(100) - mockPipeline.EXPECT().Exec().Times(100) + ).Times(10) + mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(10) + mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(10) + mockPipeline.EXPECT().Exec().Times(10) mockDb.EXPECT().Query( gomock.Any(), @@ -589,7 +589,7 @@ autoscaling: MockInsertScheduler(mockDb, nil) MockUpdateScheduler(mockDb, nil, nil) mockRedisClient.EXPECT().Get(models.GlobalPortsPoolKey). - Return(goredis.NewStringResult(workerPortRange, nil)).Times(100) + Return(goredis.NewStringResult(workerPortRange, nil)).Times(10) var configYaml1 models.ConfigYAML err := yaml.Unmarshal([]byte(yamlString), &configYaml1) @@ -699,7 +699,7 @@ autoscaling: }, "shutdownTimeout": 180, "autoscaling": { - "min": 100, + "min": 10, "up": { "delta": 10, "trigger": { @@ -779,8 +779,8 @@ autoscaling: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -1111,8 +1111,8 @@ autoscaling: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -1202,8 +1202,8 @@ autoscaling: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, errors.New("err on db")) @@ -2381,8 +2381,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2439,8 +2439,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2780,8 +2780,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2873,7 +2873,7 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) @@ -2954,8 +2954,8 @@ game: game-name // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd) + MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + MockGetPortsFromPool(&configYaml, mockRedisClient, nil, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table MockUpdateSchedulersTable(mockDb, nil) diff --git a/controller/controller.go b/controller/controller.go index 3ca89e7a1..c7fcfbaa4 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -693,7 +693,7 @@ waitForLock: deletedPods := []v1.Pod{} for i, chunk := range podChunks { - l.Debugf("deleting chunk %d: %v", i, names(chunk)) + l.Debugf("updating chunk %d: %v", i, names(chunk)) newlyCreatedPods, newlyDeletedPods, timedout, canceled := replacePodsAndWait( l, roomManager, mr, clientset, db, redisClient.Client, diff --git a/controller/controller_test.go b/controller/controller_test.go index c4cb19104..0d325e72a 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -353,7 +353,7 @@ var _ = Describe("Controller", func() { mt.MockInsertScheduler(mockDb, nil) mt.MockUpdateScheduler(mockDb, nil, nil) - mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -440,7 +440,7 @@ cmd: mt.MockInsertScheduler(mockDb, nil) mt.MockUpdateScheduler(mockDb, nil, nil) - mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -545,7 +545,7 @@ portRange: schedulerPortStart := configYaml1.PortRange.Start schedulerPortEnd := configYaml1.PortRange.End mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, - workerPortRange, schedulerPortStart, schedulerPortEnd) + workerPortRange, schedulerPortStart, schedulerPortEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -1723,7 +1723,7 @@ portRange: schedulerPortStart := configYaml.PortRange.Start schedulerPortEnd := configYaml.PortRange.End mt.MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, - workerPortRange, schedulerPortStart, schedulerPortEnd) + workerPortRange, schedulerPortStart, schedulerPortEnd, 0) err = mt.MockSetScallingAmount( mockRedisClient, @@ -3610,10 +3610,10 @@ cmd: // Remove old rooms mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml2) - // Create new roome + // Create new rooms // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2) - mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 0) + mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -3744,13 +3744,13 @@ portRange: Return(goredis.NewStringResult(workerPortRange, nil)) // Remove old rooms - mt.MockRemoveRoomStatusFromRedis(mockRedisClient, mockPipeline, pods, &configYaml2) + mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml2) // Create new rooms // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale mt.MockCreateRoomsWithPorts(mockRedisClient, mockPipeline, &configYaml2) mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, configYaml2.PortRange.Start, configYaml2.PortRange.End) + workerPortRange, configYaml2.PortRange.Start, configYaml2.PortRange.End, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -3917,9 +3917,9 @@ cmd: mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1) // Create new rooms - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 0) mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd) + workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -4089,12 +4089,12 @@ portRange: Return(goredis.NewStringResult(workerPortRange, nil)) // Remove old rooms - mt.MockRemoveRoomStatusFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1) + mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1) // Create new rooms mt.MockCreateRoomsWithPorts(mockRedisClient, mockPipeline, &configYaml2) mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, 20000, 20020) + workerPortRange, 20000, 20020, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -4399,11 +4399,7 @@ cmd: Expect(err.Error()).To(Equal("error getting lock")) }) - It("should return error if timeout when deleting rooms", func() { - pods, err := clientset.CoreV1().Pods("controller-name").List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(3)) - + It("should return error if timeout when creating rooms", func() { // Update scheduler calls := mt.NewCalls() @@ -4423,21 +4419,22 @@ cmd: calls.Append( mt.MockSelectScheduler(yaml1, mockDb, nil)) - // Delete first room + calls.Append(mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd, 1)) + + // Create rooms calls.Append( - mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml1)) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 1)) - // Get timeout for waiting pod to be deleted and create new one + // Get timeout for waiting pod to be created calls.Add( mockClock.EXPECT(). Now(). Return(time.Unix(int64(timeoutSec+100), 0))) - // Timeod out, so rollback + // Delete newly created rooms calls.Append( - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1)) - calls.Append(mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd)) + mt.MockRemoveAnyRoomsFromRedis(mockRedisClient, mockPipeline, 1, &configYaml1)) calls.Append( mt.MockReturnRedisLock(mockRedisClient, lockKey, nil)) @@ -4463,11 +4460,7 @@ cmd: Expect(err.Error()).To(Equal("timedout waiting rooms to be replaced, rolled back")) }) - It("should return error if timeout when creating rooms", func() { - pods, err := clientset.CoreV1().Pods("controller-name").List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(3)) - + It("should return error if timeout when deleting rooms", func() { // Update scheduler calls := mt.NewCalls() @@ -4476,145 +4469,50 @@ cmd: mockClock.EXPECT(). Now(). Return(time.Unix(0, 0))) - calls.Add( - mockRedisClient.EXPECT(). - SetNX(lockKey, gomock.Any(), time.Duration(lockTimeoutMs)*time.Millisecond). - Return(goredis.NewBoolResult(true, nil))) + calls.Append( + mt.MockRedisLock(mockRedisClient, lockKey, lockTimeoutMs, true, nil)) // Set new operation manager description - mt.MockSetDescription(opManager, mockRedisClient, "running", nil) + calls.Append( + mt.MockSetDescription(opManager, mockRedisClient, "running", nil)) // Get scheduler from DB - calls.Add( - mockDb.EXPECT(). - Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", configYaml2.Name). - Do(func(scheduler *models.Scheduler, query string, modifier string) { - *scheduler = *models.NewScheduler(configYaml1.Name, configYaml1.Game, yaml1) - })) - - // Delete first room - for _, pod := range pods.Items { - calls.Add( - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) - - room := models.NewRoom(pod.GetName(), pod.GetNamespace()) - - for _, status := range allStatus { - calls.Add( - mockPipeline.EXPECT(). - SRem(models.GetRoomStatusSetRedisKey(room.SchedulerName, status), room.GetRoomRedisKey())) - mockPipeline.EXPECT(). - ZRem(models.GetLastStatusRedisKey(room.SchedulerName, status), room.ID) - } - calls.Add(mockPipeline.EXPECT().ZRem(models.GetRoomPingRedisKey(pod.GetNamespace()), room.ID)) - for _, mt := range allMetrics { - calls.Add( - mockPipeline.EXPECT(). - ZRem(models.GetRoomMetricsRedisKey(room.SchedulerName, mt), gomock.Any())) - } - calls.Add(mockPipeline.EXPECT().Del(room.GetRoomRedisKey())) - calls.Add(mockPipeline.EXPECT().Exec()) - break - } + calls.Append( + mt.MockSelectScheduler(yaml1, mockDb, nil)) - // Get timeout for waiting pod to be deleted and create new one + // Create room + calls.Append( + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 1)) + calls.Append(mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd, 1)) calls.Add( mockClock.EXPECT(). Now(). Return(time.Unix(int64(timeoutSec-100), 0))) - // Create new room with updated scheduler - for range pods.Items { - calls.Add(mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) - calls.Add( - mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( - func(schedulerName string, statusInfo map[string]interface{}) { - Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) - Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) - }, - )) - calls.Add( - mockPipeline.EXPECT(). - SAdd(models.GetRoomStatusSetRedisKey(configYaml1.Name, "creating"), gomock.Any())) - calls.Add( - mockPipeline.EXPECT(). - ZAdd(models.GetRoomPingRedisKey(configYaml1.Name), gomock.Any())) - calls.Add( - mockPipeline.EXPECT().Exec()) - calls.Add( - mockRedisClient.EXPECT(). - Get(models.GlobalPortsPoolKey). - Return(goredis.NewStringResult(workerPortRange, nil))) - calls.Add( - mockPortChooser.EXPECT(). - Choose(portStart, portEnd, 2). - Return([]int{5000, 5001})) - break - } + // Delete old rooms + calls.Append( + mt.MockRemoveAnyRoomsFromRedis(mockRedisClient, mockPipeline, 1, &configYaml1)) - // Get timeout for waiting new pod to be created + // Get timeout for waiting old pods to be deleted calls.Add( mockClock.EXPECT(). Now(). Return(time.Unix(int64(timeoutSec+100), 0))) - // Timed out, so rollback - // Delete newly created room - for range pods.Items { - calls.Add( - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) + // Delete newly created rooms + calls.Append( + mt.MockRemoveAnyRoomsFromRedis(mockRedisClient, mockPipeline, 1, &configYaml1)) - for range allStatus { - calls.Add( - mockPipeline.EXPECT(). - SRem(gomock.Any(), gomock.Any())) - calls.Add(mockPipeline.EXPECT(). - ZRem(gomock.Any(), gomock.Any())) - } - for range allMetrics { - calls.Add( - mockPipeline.EXPECT(). - ZRem(gomock.Any(), gomock.Any())) - } - calls.Add(mockPipeline.EXPECT().ZRem(gomock.Any(), gomock.Any())) - calls.Add(mockPipeline.EXPECT().Del(gomock.Any())) - calls.Add(mockPipeline.EXPECT().Exec()) - break - } + // Recreate old rooms + calls.Append( + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 1)) + calls.Append(mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd, 1)) - // Create new room to replace the one with old version deleted - for range pods.Items { - calls.Add(mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)) - calls.Add( - mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do( - func(schedulerName string, statusInfo map[string]interface{}) { - Expect(statusInfo["status"]).To(Equal(models.StatusCreating)) - Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1)) - }, - )) - calls.Add( - mockPipeline.EXPECT(). - SAdd(models.GetRoomStatusSetRedisKey(configYaml1.Name, "creating"), gomock.Any())) - calls.Add( - mockPipeline.EXPECT(). - ZAdd(models.GetRoomPingRedisKey(configYaml1.Name), gomock.Any())) - calls.Add( - mockPipeline.EXPECT().Exec()) - calls.Add( - mockRedisClient.EXPECT(). - Get(models.GlobalPortsPoolKey). - Return(goredis.NewStringResult(workerPortRange, nil))) - calls.Add( - mockPortChooser.EXPECT(). - Choose(portStart, portEnd, 2). - Return([]int{5000, 5001})) - break - } + calls.Append( + mt.MockReturnRedisLock(mockRedisClient, lockKey, nil)) - calls.Add( - mockRedisClient.EXPECT(). - Eval(gomock.Any(), []string{lockKey}, gomock.Any()). - Return(goredis.NewCmdResult(nil, nil))) calls.Finish() err = controller.UpdateSchedulerConfig( @@ -4626,7 +4524,7 @@ cmd: redisClient, clientset, &configYaml2, - 10, + maxSurge, mockClock, nil, config, @@ -4659,9 +4557,22 @@ cmd: // Get scheduler from DB calls.Append(mt.MockSelectScheduler(yaml1, mockDb, nil)) - // Delete rooms + // Create and Delete rooms errRedis := errors.New("redis error") for _, pod := range pods.Items { + + // Create new pod + calls.Append( + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 1)) + calls.Append( + mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, + workerPortRange, portStart, portEnd, 1)) + // Get time.Now() + calls.Add( + mockClock.EXPECT(). + Now(). + Return(time.Unix(int64(timeoutSec-100), 1))) + // Retrieve ports to pool room := models.NewRoom(pod.GetName(), pod.GetNamespace()) calls.Add( @@ -4691,23 +4602,6 @@ cmd: Return(nil, errRedis)) } - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(0, 0))) - - // Create new pods - calls.Append( - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2)) - calls.Append( - mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd)) - - calls.Add( - mockClock.EXPECT(). - Now(). - Return(time.Unix(0, 0))) - calls.Append( mt.MockUpdateSchedulersTable(mockDb, nil)) @@ -4787,8 +4681,8 @@ cmd: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2) - mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml2, 0) + mt.MockGetPortsFromPool(&configYaml2, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -4905,131 +4799,7 @@ cmd: } }) - It("should stop on createPodsAsTheyAreDeleted", func() { - yamlString := ` -name: scheduler-name-cancel -autoscaling: - min: 3 - up: - trigger: - limit: 10 -containers: -- name: container1 - image: image1 -` - newYamlString := ` -name: scheduler-name-cancel -autoscaling: - min: 3 - up: - trigger: - limit: 10 -containers: -- name: container1 - image: image2 -` - configYaml, _ := models.NewConfigYAML(yamlString) - newConfigYaml, err := models.NewConfigYAML(newYamlString) - Expect(err).ToNot(HaveOccurred()) - - mt.MockCreateScheduler(clientset, mockRedisClient, mockPipeline, mockDb, - logger, roomManager, mr, yamlString, timeoutSec, mockPortChooser, workerPortRange, portStart, portEnd) - - pods, err := clientset.CoreV1().Pods("scheduler-name-cancel").List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(3)) - - for _, pod := range pods.Items { - Expect(pod.ObjectMeta.Labels["version"]).To(Equal("v1.0")) - } - - // Select current scheduler yaml - mt.MockSelectScheduler(newYamlString, mockDb, nil) - - // Get redis lock - lockKey := "maestro-lock-key-scheduler-name-cancel" - mt.MockRedisLock(mockRedisClient, lockKey, lockTimeoutMs, true, nil) - - // Set new operation manager description - mt.MockSetDescription(opManager, mockRedisClient, "running", nil) - - // Remove old room - for i, pod := range pods.Items { - room := models.NewRoom(pod.GetName(), pod.GetNamespace()) - mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline) - - for _, status := range allStatus { - mockPipeline.EXPECT(). - SRem( - models.GetRoomStatusSetRedisKey(room.SchedulerName, status), - room.GetRoomRedisKey()) - mockPipeline.EXPECT().ZRem( - models.GetLastStatusRedisKey(room.SchedulerName, status), room.ID) - } - for _, mt := range allMetrics { - mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(room.SchedulerName, mt), gomock.Any()) - } - mockPipeline.EXPECT(). - ZRem(models.GetRoomPingRedisKey(pod.GetNamespace()), room.ID) - - if i == len(pods.Items)-1 { - mockPipeline.EXPECT().Del(room.GetRoomRedisKey()).Do(func(_ string) { - opManager.Cancel(opManager.GetOperationKey()) - }) - } else { - mockPipeline.EXPECT().Del(room.GetRoomRedisKey()) - } - - mockPipeline.EXPECT().Exec() - } - - // Delete keys from OperationManager (to cancel it) - mt.MockDeleteRedisKey(opManager, mockRedisClient, mockPipeline, nil) - - // Create rooms to rollback - mt.MockCreateRooms(mockRedisClient, mockPipeline, newConfigYaml) - mt.MockGetPortsFromPool(newConfigYaml, mockRedisClient, mockPortChooser, - workerPortRange, portStart, portEnd) - - // Retrieve redis lock - mt.MockReturnRedisLock(mockRedisClient, lockKey, nil) - - err = controller.UpdateSchedulerConfig( - context.Background(), - logger, - roomManager, - mr, - mockDb, - redisClient, - clientset, - configYaml, - maxSurge, - &clock.Clock{}, - nil, - config, - opManager, - ) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("operation was canceled, rolled back")) - - pods, err = clientset.CoreV1().Pods(configYaml.Name).List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(pods.Items).To(HaveLen(configYaml.AutoScaling.Min)) - - for _, pod := range pods.Items { - Expect(pod.GetName()).To(ContainSubstring("scheduler-name-cancel-")) - Expect(pod.GetName()).To(HaveLen(len("scheduler-name-cancel-") + 8)) - Expect(pod.Spec.Containers[0].Env[0].Name).To(Equal("MAESTRO_SCHEDULER_NAME")) - Expect(pod.Spec.Containers[0].Env[0].Value).To(Equal("scheduler-name-cancel")) - Expect(pod.Spec.Containers[0].Env[1].Name).To(Equal("MAESTRO_ROOM_ID")) - Expect(pod.Spec.Containers[0].Env[1].Value).To(Equal(pod.GetName())) - Expect(pod.Spec.Containers[0].Env).To(HaveLen(2)) - Expect(pod.ObjectMeta.Labels["heritage"]).To(Equal("maestro")) - Expect(pod.ObjectMeta.Labels["version"]).To(Equal("v1.0")) - } - }) - - It("should stop on waitCreatingPods", func() { + It("should stop on waitCreatingAndDeleteOldPods", func() { yamlString := ` name: scheduler-name-cancel autoscaling: @@ -5053,7 +4823,6 @@ containers: image: image2 ` configYaml, _ := models.NewConfigYAML(yamlString) - newConfigYaml, _ := models.NewConfigYAML(newYamlString) mt.MockCreateScheduler(clientset, mockRedisClient, mockPipeline, mockDb, logger, roomManager, mr, yamlString, timeoutSec, mockPortChooser, workerPortRange, portStart, portEnd) @@ -5079,9 +4848,6 @@ containers: // Delete keys from OperationManager (to cancel it) mt.MockDeleteRedisKey(opManager, mockRedisClient, mockPipeline, nil) - // Create rooms to rollback - mt.MockCreateRooms(mockRedisClient, mockPipeline, newConfigYaml) - // But first, create rooms for i := 0; i < configYaml.AutoScaling.Min; i++ { mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline) @@ -5120,25 +4886,6 @@ containers: mockPipeline.EXPECT().Exec() } - // But first, remove old rooms - for i := 0; i < configYaml2.AutoScaling.Min; i++ { - mockRedisClient.EXPECT().TxPipeline(). - Return(mockPipeline) - for _, status := range allStatus { - mockPipeline.EXPECT().SRem( - models.GetRoomStatusSetRedisKey(configYaml.Name, status), gomock.Any()) - mockPipeline.EXPECT().ZRem( - models.GetLastStatusRedisKey(configYaml.Name, status), gomock.Any()) - } - for _, mt := range allMetrics { - mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(configYaml.Name, mt), gomock.Any()) - } - mockPipeline.EXPECT().ZRem( - models.GetRoomPingRedisKey(configYaml.Name), gomock.Any()) - mockPipeline.EXPECT().Del(gomock.Any()) - mockPipeline.EXPECT().Exec() - } - // Retrieve redis lock mt.MockReturnRedisLock(mockRedisClient, lockKey, nil) @@ -5334,8 +5081,8 @@ containers: // Create new roome // It will use the same number of rooms as config1, and ScaleUp to new min in Watcher at AutoScale - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1) - mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml1, 0) + mt.MockGetPortsFromPool(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) @@ -5561,8 +5308,8 @@ containers: mt.MockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, pods, &configYaml) // Create new rooms - mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml) - mt.MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd) + mt.MockCreateRooms(mockRedisClient, mockPipeline, &configYaml, 0) + mt.MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0) // Update new config on schedulers table mt.MockUpdateSchedulersTable(mockDb, nil) diff --git a/controller/utils.go b/controller/utils.go index 17fc5d8a5..9d678cb77 100644 --- a/controller/utils.go +++ b/controller/utils.go @@ -39,41 +39,59 @@ func replacePodsAndWait( willTimeoutAt time.Time, clock clockinterfaces.Clock, configYAML *models.ConfigYAML, - podsToDelete []v1.Pod, + podsChunk []v1.Pod, scheduler *models.Scheduler, operationManager *models.OperationManager, ) (createdPods []v1.Pod, deletedPods []v1.Pod, timedout, canceled bool) { createdPods = []v1.Pod{} deletedPods = []v1.Pod{} - for _, pod := range podsToDelete { - logger.Debugf("deleting pod %s", pod.GetName()) + // create a chunk of pods (chunkSize = maxSurge) + for _, pod := range podsChunk { + logger.Debug("creating pods") + + // create new pod + newPod, err := roomManager.Create(logger, mr, redisClient, + db, clientset, configYAML, scheduler) + + if err != nil { + logger.WithError(err).Debug("error creating pod") + continue + } + + createdPods = append(createdPods, *newPod) + + // wait for new pod to be created + timeout := willTimeoutAt.Sub(clock.Now()) + timedout, canceled = waitCreatingPods( + logger, clientset, timeout, configYAML.Name, + []v1.Pod{*newPod}, operationManager, mr) + if timedout || canceled { + return createdPods, deletedPods, timedout, canceled + } - err := DeletePodAndRoom(logger, roomManager, mr, clientset, redisClient, + // delete old pod + logger.Debugf("deleting pod %s", pod.GetName()) + err = DeletePodAndRoom(logger, roomManager, mr, clientset, redisClient, configYAML, pod.GetName(), reportersConstants.ReasonUpdate) if err == nil || strings.Contains(err.Error(), "redis") { deletedPods = append(deletedPods, pod) } if err != nil { logger.WithError(err).Debugf("error deleting pod %s", pod.GetName()) + continue } - } - now := clock.Now() - timeout := willTimeoutAt.Sub(now) - createdPods, timedout, canceled = createPodsAsTheyAreDeleted( - logger, roomManager, mr, clientset, db, redisClient, timeout, configYAML, - deletedPods, scheduler, operationManager) - if timedout || canceled { - return createdPods, deletedPods, timedout, canceled - } - - timeout = willTimeoutAt.Sub(clock.Now()) - timedout, canceled = waitCreatingPods( - logger, clientset, timeout, configYAML.Name, - createdPods, operationManager, mr) - if timedout || canceled { - return createdPods, deletedPods, timedout, canceled + // wait for old pods to be deleted + // we assume that maxSurge == maxUnavailable as we can't set maxUnavailable yet + // so for every pod created in a chunk one is deleted right after it + timeout = willTimeoutAt.Sub(clock.Now()) + timedout, canceled = waitTerminatingPods( + logger, clientset, timeout, configYAML.Name, + []v1.Pod{pod}, operationManager, mr) + if timedout || canceled { + return createdPods, deletedPods, timedout, canceled + } } return createdPods, deletedPods, false, false @@ -136,13 +154,12 @@ func rollback( } waitTimeout := willTimeoutAt.Sub(time.Now()) - err = waitTerminatingPods( + timedout, _ := waitTerminatingPods( logger, clientset, waitTimeout, configYAML.Name, - createdPodChunks[i], - mr, + createdPodChunks[i], nil, mr, ) - if err != nil { - return err + if timedout { + return errors.New("timeout waiting for rooms to be removed") } } @@ -166,81 +183,65 @@ func rollback( } waitTimeout := willTimeoutAt.Sub(time.Now()) - waitCreatingPods(logger, clientset, waitTimeout, configYAML.Name, - newlyCreatedPods, nil, mr) + waitCreatingAndDeleteOldPods(logger, clientset, redisClient, waitTimeout, configYAML, + newlyCreatedPods, nil, nil, nil, mr) } } return nil } -func createPodsAsTheyAreDeleted( +func waitTerminatingPods( l logrus.FieldLogger, - roomManager models.RoomManager, - mr *models.MixedMetricsReporter, clientset kubernetes.Interface, - db pginterfaces.DB, - redisClient redisinterfaces.RedisClient, timeout time.Duration, - configYAML *models.ConfigYAML, + namespace string, deletedPods []v1.Pod, - scheduler *models.Scheduler, operationManager *models.OperationManager, -) (createdPods []v1.Pod, timedout, wasCanceled bool) { + mr *models.MixedMetricsReporter, +) (timedout, wasCanceled bool) { logger := l.WithFields(logrus.Fields{ "operation": "controller.waitTerminatingPods", - "scheduler": configYAML.Name, + "scheduler": namespace, }) - createdPods = []v1.Pod{} - logger.Debugf("pods to terminate: %#v", names(deletedPods)) + logger.Debugf("waiting for pods to terminate: %#v", names(deletedPods)) timeoutTimer := time.NewTimer(timeout) defer timeoutTimer.Stop() ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - i := 0 for { exit := true select { case <-ticker.C: - if operationManager.WasCanceled() { + if operationManager != nil && operationManager.WasCanceled() { logger.Warn("operation was canceled") - return createdPods, false, true + return false, true } - for j := i; j < len(deletedPods); j++ { - pod := deletedPods[i] + for _, pod := range deletedPods { err := mr.WithSegment(models.SegmentPod, func() error { var err error - _, err = clientset.CoreV1().Pods(configYAML.Name).Get(pod.GetName(), getOptions) + _, err = clientset.CoreV1().Pods(namespace).Get( + pod.GetName(), getOptions, + ) return err }) - if err == nil || !strings.Contains(err.Error(), "not found") { - logger.WithField("pod", pod.GetName()).Debugf("pod still exists") - exit = false - break - } - newPod, err := roomManager.Create(logger, mr, redisClient, - db, clientset, configYAML, scheduler) - if err != nil { + if err == nil || !strings.Contains(err.Error(), "not found") { + logger.WithField("pod", pod.GetName()).Debugf("pod still exists, deleting again") + err = mr.WithSegment(models.SegmentPod, func() error { + return clientset.CoreV1().Pods(namespace).Delete(pod.GetName(), deleteOptions) + }) exit = false - logger. - WithError(err). - Info("error creating pod") break } - - i = j + 1 - - createdPods = append(createdPods, *newPod) } case <-timeoutTimer.C: - err := errors.New("timeout waiting for rooms to be removed") - logger.WithError(err).Error("stopping scale") - return createdPods, true, false + logger.Error("timeout waiting for rooms to be removed") + return true, false } if exit { @@ -249,24 +250,23 @@ func createPodsAsTheyAreDeleted( } } - return createdPods, false, false + return false, false } -func waitTerminatingPods( +func waitCreatingPods( l logrus.FieldLogger, clientset kubernetes.Interface, timeout time.Duration, namespace string, - deletedPods []v1.Pod, + createdPods []v1.Pod, + operationManager *models.OperationManager, mr *models.MixedMetricsReporter, -) error { +) (timedout, wasCanceled bool) { logger := l.WithFields(logrus.Fields{ - "operation": "controller.waitTerminatingPods", + "operation": "controller.waitCreatingPods", "scheduler": namespace, }) - logger.Debugf("waiting for pods to terminate: %#v", names(deletedPods)) - timeoutTimer := time.NewTimer(timeout) defer timeoutTimer.Stop() ticker := time.NewTicker(1 * time.Second) @@ -276,51 +276,83 @@ func waitTerminatingPods( exit := true select { case <-ticker.C: - for _, pod := range deletedPods { + if operationManager.WasCanceled() { + logger.Warn("operation was canceled") + return false, true + } + + for _, pod := range createdPods { + var createdPod *v1.Pod err := mr.WithSegment(models.SegmentPod, func() error { var err error - _, err = clientset.CoreV1().Pods(namespace).Get( + createdPod, err = clientset.CoreV1().Pods(namespace).Get( pod.GetName(), getOptions, ) return err }) + if err != nil && strings.Contains(err.Error(), "not found") { + exit = false + logger. + WithError(err). + WithField("pod", pod.GetName()). + Info("error creating pod, recreating...") - if err == nil || !strings.Contains(err.Error(), "not found") { - logger.WithField("pod", pod.GetName()).Debugf("pod still exists, deleting again") + pod.ResourceVersion = "" err = mr.WithSegment(models.SegmentPod, func() error { - return clientset.CoreV1().Pods(namespace).Delete(pod.GetName(), deleteOptions) + var err error + _, err = clientset.CoreV1().Pods(namespace).Create(&pod) + return err }) + if err != nil { + logger. + WithError(err). + WithField("pod", pod.GetName()). + Errorf("error recreating pod") + } + break + } + + if len(createdPod.Status.Phase) == 0 { + //HACK! Trying to detect if we are running unit tests + break + } + + if !models.IsPodReady(createdPod) { + logger.WithField("pod", createdPod.GetName()).Debug("pod not ready yet, waiting...") exit = false break } } case <-timeoutTimer.C: - err := errors.New("timeout waiting for rooms to be removed") - logger.WithError(err).Error("stopping scale") - return err + logger.Error("timeout waiting for rooms to be created") + return true, false } if exit { - logger.Info("terminating pods were successfully removed") + logger.Info("creating pods are successfully running") break } } - return nil + return false, false } -func waitCreatingPods( +func waitCreatingAndDeleteOldPods( l logrus.FieldLogger, clientset kubernetes.Interface, + redisClient redisinterfaces.RedisClient, timeout time.Duration, - namespace string, + configYAML *models.ConfigYAML, createdPods []v1.Pod, + podsToDelete []v1.Pod, + roomManager models.RoomManager, operationManager *models.OperationManager, mr *models.MixedMetricsReporter, -) (timedout, wasCanceled bool) { +) (deletedPods []v1.Pod, timedout, wasCanceled bool) { + deletedPods = []v1.Pod{} logger := l.WithFields(logrus.Fields{ - "operation": "controller.waitCreatingPods", - "scheduler": namespace, + "operation": "controller.waitCreatingAndDeleteOldPods", + "scheduler": configYAML.Name, }) timeoutTimer := time.NewTimer(timeout) @@ -334,14 +366,14 @@ func waitCreatingPods( case <-ticker.C: if operationManager.WasCanceled() { logger.Warn("operation was canceled") - return false, true + return nil, false, true } for _, pod := range createdPods { var createdPod *v1.Pod err := mr.WithSegment(models.SegmentPod, func() error { var err error - createdPod, err = clientset.CoreV1().Pods(namespace).Get( + createdPod, err = clientset.CoreV1().Pods(configYAML.Name).Get( pod.GetName(), getOptions, ) return err @@ -356,7 +388,7 @@ func waitCreatingPods( pod.ResourceVersion = "" err = mr.WithSegment(models.SegmentPod, func() error { var err error - _, err = clientset.CoreV1().Pods(namespace).Create(&pod) + _, err = clientset.CoreV1().Pods(configYAML.Name).Create(&pod) return err }) if err != nil { @@ -368,20 +400,37 @@ func waitCreatingPods( break } - if len(createdPod.Status.Phase) == 0 { - //HACK! Trying to detect if we are running unit tests - break - } - - if !models.IsPodReady(createdPod) { + //HACK! Trying to detect if we are running unit tests + // len(createdPod.Status.Phase) > 0 + if len(createdPod.Status.Phase) > 0 && !models.IsPodReady(createdPod) { logger.WithField("pod", createdPod.GetName()).Debug("pod not ready yet, waiting...") exit = false break } + + if podsToDelete != nil && len(podsToDelete) > 0 { + err = DeletePodAndRoom( + logger, + roomManager, + mr, + clientset, + redisClient, + configYAML, + podsToDelete[0].GetName(), + reportersConstants.ReasonUpdate, + ) + if err == nil || strings.Contains(err.Error(), "redis") { + deletedPods = append(deletedPods, podsToDelete[0]) + podsToDelete = podsToDelete[1:] + } + if err != nil { + logger.WithError(err).Debugf("error deleting pod %s", pod.GetName()) + } + } } case <-timeoutTimer.C: logger.Error("timeout waiting for rooms to be created") - return true, false + return nil, true, false } if exit { @@ -390,7 +439,7 @@ func waitCreatingPods( } } - return false, false + return deletedPods, false, false } // DeletePodAndRoom deletes the pod and removes the room from redis diff --git a/testing/common.go b/testing/common.go index 282abb12f..147e76f1b 100644 --- a/testing/common.go +++ b/testing/common.go @@ -169,6 +169,7 @@ func MockDeleteOldVersions( func mockRemoveRoomsFromRedis( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, + numRooms int, pods *v1.PodList, configYaml *models.ConfigYAML, ) (calls *Calls) { @@ -187,6 +188,38 @@ func mockRemoveRoomsFromRedis( string(models.MemAutoScalingPolicyType), } + for index := 0; index < numRooms; index++ { + calls.Add( + mockRedisClient.EXPECT(). + TxPipeline(). + Return(mockPipeline)) + for _, status := range allStatus { + calls.Add( + mockPipeline.EXPECT(). + SRem(models.GetRoomStatusSetRedisKey(configYaml.Name, status), gomock.Any())) + calls.Add( + mockPipeline.EXPECT(). + ZRem(models.GetLastStatusRedisKey(configYaml.Name, status), gomock.Any())) + } + calls.Add( + mockPipeline.EXPECT(). + ZRem(models.GetRoomPingRedisKey(configYaml.Name), gomock.Any())) + for _, mt := range allMetrics { + calls.Add( + mockPipeline.EXPECT().ZRem(models.GetRoomMetricsRedisKey(configYaml.Name, mt), gomock.Any())) + } + calls.Add( + mockPipeline.EXPECT(). + Del(gomock.Any())) + calls.Add( + mockPipeline.EXPECT(). + Exec()) + } + + if pods == nil { + return calls + } + for _, pod := range pods.Items { room := models.NewRoom(pod.GetName(), pod.GetNamespace()) calls.Add( @@ -219,36 +252,40 @@ func mockRemoveRoomsFromRedis( return calls } -// MockRemoveRoomStatusFromRedis removes room only from redis -func MockRemoveRoomStatusFromRedis( +// MockRemoveAnyRoomsFromRedis removes any rooms from redis +func MockRemoveAnyRoomsFromRedis( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, - pods *v1.PodList, + numRooms int, configYaml *models.ConfigYAML, ) (calls *Calls) { - return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, - pods, configYaml) + return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, numRooms, nil, configYaml) } -// MockRemoveRoomsFromRedis mocks the room creation from pod +// MockRemoveRoomsFromRedis removes room only from redis func MockRemoveRoomsFromRedis( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, pods *v1.PodList, configYaml *models.ConfigYAML, ) (calls *Calls) { - return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, - pods, configYaml) + return mockRemoveRoomsFromRedis(mockRedisClient, mockPipeline, 0, pods, configYaml) } func mockCreateRooms( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, configYaml *models.ConfigYAML, + roomCount int, ) (calls *Calls) { calls = NewCalls() - for i := 0; i < configYaml.AutoScaling.Min; i++ { + count := configYaml.AutoScaling.Min + if roomCount > 0 { + count = roomCount + } + + for i := 0; i < count; i++ { calls.Add( mockRedisClient.EXPECT(). TxPipeline(). @@ -285,8 +322,9 @@ func MockCreateRooms( mockRedisClient *redismocks.MockRedisClient, mockPipeline *redismocks.MockPipeliner, configYaml *models.ConfigYAML, + roomCount int, ) (calls *Calls) { - return mockCreateRooms(mockRedisClient, mockPipeline, configYaml) + return mockCreateRooms(mockRedisClient, mockPipeline, configYaml, roomCount) } // MockCreateRoomsWithPorts mocks the creation of rooms on redis when @@ -296,7 +334,7 @@ func MockCreateRoomsWithPorts( mockPipeline *redismocks.MockPipeliner, configYaml *models.ConfigYAML, ) (calls *Calls) { - return mockCreateRooms(mockRedisClient, mockPipeline, configYaml) + return mockCreateRooms(mockRedisClient, mockPipeline, configYaml, 0) } // MockCreateScheduler mocks the creation of a scheduler @@ -355,7 +393,7 @@ func MockCreateScheduler( Times(configYaml.AutoScaling.Min)) calls.Append( - MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd)) + MockGetPortsFromPool(&configYaml, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd, 0)) calls.Append( MockUpdateScheduler(mockDb, nil, nil)) @@ -373,10 +411,15 @@ func MockGetPortsFromPool( mockRedisClient *redismocks.MockRedisClient, mockPortChooser *mocks.MockPortChooser, workerPortRange string, - portStart, portEnd int, + portStart, portEnd, times int, ) (calls *Calls) { calls = NewCalls() + callTimes := configYaml.AutoScaling.Min + if times > 0 { + callTimes = times + } + if !configYaml.HasPorts() { return } @@ -385,7 +428,7 @@ func MockGetPortsFromPool( mockRedisClient.EXPECT(). Get(models.GlobalPortsPoolKey). Return(goredis.NewStringResult(workerPortRange, nil)). - Times(configYaml.AutoScaling.Min) + Times(callTimes) } if mockPortChooser == nil { @@ -400,7 +443,7 @@ func MockGetPortsFromPool( mockPortChooser.EXPECT(). Choose(portStart, portEnd, nPorts). Return(ports). - Times(configYaml.AutoScaling.Min) + Times(callTimes) } if configYaml.Version() == "v1" {