Skip to content

Commit

Permalink
Remove leaked ping keys in RemoveDeadRooms
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Felipe Takakura committed Sep 19, 2019
1 parent 055567c commit bea292c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 31 deletions.
2 changes: 0 additions & 2 deletions api/room_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,11 +1100,9 @@ forwarders:
It("with custom metric and limit", func() {
mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient)
pKey := models.GetRoomMetricsRedisKey(namespace, "cpu")
// expC := &models.RoomsStatusCount{1, 1, 2, 1} // creating,occupied,ready,terminating
expectedRooms := []string{"test-ready-0", "test-ready-1", "test-occupied-0"}
readyKey := models.GetRoomStatusSetRedisKey(namespace, models.StatusReady)
occupiedKey := models.GetRoomStatusSetRedisKey(namespace, models.StatusOccupied)
// rooms := CreateTestRooms(clientset, namespace, expC)

mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)
mockPipeline.EXPECT().ZRange(pKey, int64(0), int64(123-1)).Return(
Expand Down
5 changes: 4 additions & 1 deletion models/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ func GetRooms(redisClient interfaces.RedisClient, schedulerName string, mr *Mixe
}

for _, redisKey := range redisKeys {
result = append(result, RoomFromRedisKey(redisKey))
r := RoomFromRedisKey(redisKey)
if r != "" {
result = append(result, r)
}
}
return result, nil
}
Expand Down
3 changes: 1 addition & 2 deletions testing/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,7 @@ func MockRemoveZombieRooms(
mockPipeline *redismocks.MockPipeliner,
mockRedisClient *redismocks.MockRedisClient,
rooms []string,
schedulerName,
status string,
schedulerName string,
) {
allStatus := []string{
models.StatusCreating,
Expand Down
49 changes: 34 additions & 15 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,18 @@ func (w *Watcher) reportRoomsStatusesRoutine() {

func (w *Watcher) watchRooms() error {
l := w.Logger.WithFields(logrus.Fields{
"operation": "watcher.watchRooms",
"operation": "watcher.watchRooms.EnsureCorrectRooms",
})
w.WithDownscalingLock(l, w.EnsureCorrectRooms)

l = w.Logger.WithFields(logrus.Fields{
"operation": "watcher.watchRooms.RemoveDeadRooms",
})
w.WithDownscalingLock(l, w.RemoveDeadRooms)

l = w.Logger.WithFields(logrus.Fields{
"operation": "watcher.watchRooms.AutoScale",
})
w.WithTerminationLock(l, w.AutoScale)
w.AddUtilizationMetricsToRedis()
return nil
Expand Down Expand Up @@ -628,11 +636,31 @@ func (w *Watcher) RemoveDeadRooms() error {

pods := []v1.Pod{}

// get rooms with no ping
roomsNoPingSince, err := w.roomsWithNoPing(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are not pinging")
return err
}

// get rooms with occupation timeout
roomsOnOccupiedTimeout, err := w.roomsWithOccupationTimeout(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are on occupied timeout state")
return err
}

// get rooms registered
rooms, err := models.GetRooms(w.RedisClient.Client, w.SchedulerName, w.MetricsReporter)
if err != nil {
logger.WithError(err).Error("error listing terminating rooms")
logger.WithError(err).Error("error listing registered rooms")
}

// append rooms with no ping and on occupation timeout
// to make sure these keys don't leak
noPingAndOccupied := append(roomsNoPingSince, roomsOnOccupiedTimeout...)
rooms = append(rooms, noPingAndOccupied...)

if len(rooms) > 0 {
pods, err = w.listPods()
if err != nil {
Expand All @@ -646,20 +674,11 @@ func (w *Watcher) RemoveDeadRooms() error {
logger.WithError(err).Error("failed to remove zombie rooms")
return err
}
}

// get rooms with no ping
roomsNoPingSince, err := w.roomsWithNoPing(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are not pinging")
return err
}

// get rooms with occupation timeout
roomsOnOccupiedTimeout, err := w.roomsWithOccupationTimeout(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are on occupied timeout state")
return err
l := logger.WithFields(logrus.Fields{
"rooms": fmt.Sprintf("%v", rooms),
})
l.Info("successfully deleted zombie rooms")
}

if len(roomsNoPingSince) > 0 || len(roomsOnOccupiedTimeout) > 0 {
Expand Down
23 changes: 12 additions & 11 deletions watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ var _ = Describe("Watcher", func() {
testing.TransformLegacyInMetricsTrigger(mockAutoScaling)

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, configYaml.Name, "terminating")
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, configYaml.Name)

// Mock send usage percentage
testing.MockSendUsage(mockPipeline, mockRedisClient, mockAutoScaling)
Expand Down Expand Up @@ -4122,7 +4122,7 @@ var _ = Describe("Watcher", func() {
testing.MockCreateRoomsAnyTimes(mockRedisClient, mockPipeline, &configYaml, 0)

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{"scheduler:controller-name:rooms:room-0"}, schedulerName, "terminating")
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{"scheduler:controller-name:rooms:room-0"}, schedulerName)

for _, roomName := range expectedRooms {
room := models.NewRoom(roomName, schedulerName)
Expand Down Expand Up @@ -4189,6 +4189,8 @@ var _ = Describe("Watcher", func() {
pKey := models.GetRoomPingRedisKey(schedulerName)
lKey := models.GetLastStatusRedisKey(schedulerName, models.StatusOccupied)
ts := time.Now().Unix() - w.Config.GetInt64("pingTimeout")
expectedRooms := []string{"room-0", "room-1", "room-2"}

// DeleteRoomsNoPingSince
mockRedisClient.EXPECT().ZRangeByScore(
pKey,
Expand All @@ -4198,14 +4200,10 @@ var _ = Describe("Watcher", func() {
max, err := strconv.Atoi(zrangeby.Max)
Expect(err).NotTo(HaveOccurred())
Expect(max).To(BeNumerically("~", ts, 1*time.Second))
}).Return(redis.NewStringSliceResult([]string{}, errors.New("some error"))).AnyTimes()

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, schedulerName, "terminating")
}).Return(redis.NewStringSliceResult([]string{}, errors.New("some error")))

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.OccupiedTimeout
expectedRooms := []string{"room1", "room2", "room3"}
mockRedisClient.EXPECT().ZRangeByScore(
lKey,
redis.ZRangeBy{Min: "-inf", Max: strconv.FormatInt(ts, 10)},
Expand All @@ -4217,9 +4215,9 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult(expectedRooms, nil))

mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room1", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room2", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room3", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room-0", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room-1", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room-2", "metadata")
mockPipeline.EXPECT().Exec().Return([]redis.Cmder{
redis.NewStringResult(`{"region": "us"}`, nil),
redis.NewStringResult(`{"region": "us"}`, nil),
Expand All @@ -4246,6 +4244,9 @@ var _ = Describe("Watcher", func() {
})
}

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, expectedRooms, schedulerName)

testing.MockLoadScheduler(configYaml.Name, mockDb).
Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = yaml1
Expand All @@ -4272,7 +4273,7 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult([]string{}, nil))

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, schedulerName, "terminating")
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, schedulerName)

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.OccupiedTimeout
Expand Down

0 comments on commit bea292c

Please sign in to comment.