Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/remove leaked ping keys #85

Merged
merged 3 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/room_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,11 +1100,9 @@ forwarders:
It("with custom metric and limit", func() {
mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient)
pKey := models.GetRoomMetricsRedisKey(namespace, "cpu")
// expC := &models.RoomsStatusCount{1, 1, 2, 1} // creating,occupied,ready,terminating
expectedRooms := []string{"test-ready-0", "test-ready-1", "test-occupied-0"}
readyKey := models.GetRoomStatusSetRedisKey(namespace, models.StatusReady)
occupiedKey := models.GetRoomStatusSetRedisKey(namespace, models.StatusOccupied)
// rooms := CreateTestRooms(clientset, namespace, expC)

mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)
mockPipeline.EXPECT().ZRange(pKey, int64(0), int64(123-1)).Return(
Expand Down
5 changes: 4 additions & 1 deletion models/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ func GetRooms(redisClient interfaces.RedisClient, schedulerName string, mr *Mixe
}

for _, redisKey := range redisKeys {
result = append(result, RoomFromRedisKey(redisKey))
r := RoomFromRedisKey(redisKey)
if r != "" {
result = append(result, r)
}
}
return result, nil
}
Expand Down
3 changes: 1 addition & 2 deletions testing/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,7 @@ func MockRemoveZombieRooms(
mockPipeline *redismocks.MockPipeliner,
mockRedisClient *redismocks.MockRedisClient,
rooms []string,
schedulerName,
status string,
schedulerName string,
) {
allStatus := []string{
models.StatusCreating,
Expand Down
49 changes: 34 additions & 15 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,18 @@ func (w *Watcher) reportRoomsStatusesRoutine() {

func (w *Watcher) watchRooms() error {
l := w.Logger.WithFields(logrus.Fields{
"operation": "watcher.watchRooms",
"operation": "watcher.watchRooms.EnsureCorrectRooms",
})
w.WithDownscalingLock(l, w.EnsureCorrectRooms)

l = w.Logger.WithFields(logrus.Fields{
"operation": "watcher.watchRooms.RemoveDeadRooms",
})
w.WithDownscalingLock(l, w.RemoveDeadRooms)

l = w.Logger.WithFields(logrus.Fields{
"operation": "watcher.watchRooms.AutoScale",
})
w.WithTerminationLock(l, w.AutoScale)
w.AddUtilizationMetricsToRedis()
return nil
Expand Down Expand Up @@ -628,11 +636,31 @@ func (w *Watcher) RemoveDeadRooms() error {

pods := []v1.Pod{}

// get rooms with no ping
roomsNoPingSince, err := w.roomsWithNoPing(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are not pinging")
return err
}

// get rooms with occupation timeout
roomsOnOccupiedTimeout, err := w.roomsWithOccupationTimeout(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are on occupied timeout state")
return err
}

// get rooms registered
rooms, err := models.GetRooms(w.RedisClient.Client, w.SchedulerName, w.MetricsReporter)
if err != nil {
logger.WithError(err).Error("error listing terminating rooms")
logger.WithError(err).Error("error listing registered rooms")
}

// append rooms with no ping and on occupation timeout
// to make sure these keys don't leak
noPingAndOccupied := append(roomsNoPingSince, roomsOnOccupiedTimeout...)
rooms = append(rooms, noPingAndOccupied...)

if len(rooms) > 0 {
pods, err = w.listPods()
if err != nil {
Expand All @@ -646,20 +674,11 @@ func (w *Watcher) RemoveDeadRooms() error {
logger.WithError(err).Error("failed to remove zombie rooms")
return err
}
}

// get rooms with no ping
roomsNoPingSince, err := w.roomsWithNoPing(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are not pinging")
return err
}

// get rooms with occupation timeout
roomsOnOccupiedTimeout, err := w.roomsWithOccupationTimeout(logger)
if err != nil {
logger.WithError(err).Error("failed to list rooms that are on occupied timeout state")
return err
l := logger.WithFields(logrus.Fields{
"rooms": fmt.Sprintf("%v", rooms),
})
l.Info("successfully deleted zombie rooms")
}

if len(roomsNoPingSince) > 0 || len(roomsOnOccupiedTimeout) > 0 {
Expand Down
23 changes: 12 additions & 11 deletions watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ var _ = Describe("Watcher", func() {
testing.TransformLegacyInMetricsTrigger(mockAutoScaling)

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, configYaml.Name, "terminating")
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, configYaml.Name)

// Mock send usage percentage
testing.MockSendUsage(mockPipeline, mockRedisClient, mockAutoScaling)
Expand Down Expand Up @@ -4122,7 +4122,7 @@ var _ = Describe("Watcher", func() {
testing.MockCreateRoomsAnyTimes(mockRedisClient, mockPipeline, &configYaml, 0)

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{"scheduler:controller-name:rooms:room-0"}, schedulerName, "terminating")
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{"scheduler:controller-name:rooms:room-0"}, schedulerName)

for _, roomName := range expectedRooms {
room := models.NewRoom(roomName, schedulerName)
Expand Down Expand Up @@ -4189,6 +4189,8 @@ var _ = Describe("Watcher", func() {
pKey := models.GetRoomPingRedisKey(schedulerName)
lKey := models.GetLastStatusRedisKey(schedulerName, models.StatusOccupied)
ts := time.Now().Unix() - w.Config.GetInt64("pingTimeout")
expectedRooms := []string{"room-0", "room-1", "room-2"}

// DeleteRoomsNoPingSince
mockRedisClient.EXPECT().ZRangeByScore(
pKey,
Expand All @@ -4198,14 +4200,10 @@ var _ = Describe("Watcher", func() {
max, err := strconv.Atoi(zrangeby.Max)
Expect(err).NotTo(HaveOccurred())
Expect(max).To(BeNumerically("~", ts, 1*time.Second))
}).Return(redis.NewStringSliceResult([]string{}, errors.New("some error"))).AnyTimes()

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, schedulerName, "terminating")
}).Return(redis.NewStringSliceResult([]string{}, errors.New("some error")))

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.OccupiedTimeout
expectedRooms := []string{"room1", "room2", "room3"}
mockRedisClient.EXPECT().ZRangeByScore(
lKey,
redis.ZRangeBy{Min: "-inf", Max: strconv.FormatInt(ts, 10)},
Expand All @@ -4217,9 +4215,9 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult(expectedRooms, nil))

mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room1", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room2", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room3", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room-0", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room-1", "metadata")
mockPipeline.EXPECT().HGet("scheduler:controller-name:rooms:room-2", "metadata")
mockPipeline.EXPECT().Exec().Return([]redis.Cmder{
redis.NewStringResult(`{"region": "us"}`, nil),
redis.NewStringResult(`{"region": "us"}`, nil),
Expand All @@ -4246,6 +4244,9 @@ var _ = Describe("Watcher", func() {
})
}

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, expectedRooms, schedulerName)

testing.MockLoadScheduler(configYaml.Name, mockDb).
Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = yaml1
Expand All @@ -4272,7 +4273,7 @@ var _ = Describe("Watcher", func() {
}).Return(redis.NewStringSliceResult([]string{}, nil))

// Mock get terminating rooms
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, schedulerName, "terminating")
testing.MockRemoveZombieRooms(mockPipeline, mockRedisClient, []string{}, schedulerName)

// DeleteRoomsOccupiedTimeout
ts = time.Now().Unix() - w.OccupiedTimeout
Expand Down