From 74db3c69c00ac309450e7467457d0d7edaf91593 Mon Sep 17 00:00:00 2001 From: Henrique Rodrigues Date: Tue, 20 Jun 2017 21:15:26 -0300 Subject: [PATCH] One redis lock per scheduler --- README.md | 1 + api/scheduler_handler.go | 3 ++- api/scheduler_handler_test.go | 6 +++-- controller/controller.go | 2 +- helm/charts/maestro/Chart.yaml | 2 +- metadata/version.go | 2 +- testing/minikube.go | 5 ++-- watcher/watcher.go | 5 ++-- watcher/watcher_test.go | 15 ++++++------ worker/worker_int_test.go | 43 ++++++++++++++++++++++++++++++++++ 10 files changed, 67 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 0c8cae5f2..524001189 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ game: pong # several configs can refer to the same game image: pong/pong:v123 affinity: node-affinity # optional field: if set, rooms will be allocated preferentially to nodes with label "node-affinity": "true" toleration: node-toleration # optional field: if set, rooms will also be allocated in nodes with this taint +occupiedTimeout: match-time # how much time a match has. If room stays with occupied status for longer than occupiedTimeout seconds, the room is deleted ports: - containerPort: 5050 # port exposed in the container protocol: UDP # supported protocols are TCP and UDP diff --git a/api/scheduler_handler.go b/api/scheduler_handler.go index d3a72c297..5696ff725 100644 --- a/api/scheduler_handler.go +++ b/api/scheduler_handler.go @@ -158,6 +158,7 @@ func (g *SchedulerUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques payload.OccupiedTimeout = g.App.Config.GetInt64("occupiedTimeout") } logger.WithField("time", time.Now()).Info("Starting update") + lockKey := fmt.Sprintf("%s-%s", g.App.Config.GetString("watcher.lockKey"), payload.Name) err = mr.WithSegment(models.SegmentController, func() error { return controller.UpdateSchedulerConfig( l, @@ -167,7 +168,7 @@ func (g *SchedulerUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques g.App.KubernetesClient, payload, timeoutSec, g.App.Config.GetInt("watcher.lockTimeoutMs"), - g.App.Config.GetString("watcher.lockKey"), + lockKey, &clock.Clock{}, ) }) diff --git a/api/scheduler_handler_test.go b/api/scheduler_handler_test.go index 62f89166d..07dd4d362 100644 --- a/api/scheduler_handler_test.go +++ b/api/scheduler_handler_test.go @@ -448,8 +448,10 @@ var _ = Describe("Scheduler Handler", func() { *scheduler = *models.NewScheduler(configYaml1.Name, configYaml1.Game, jsonString) }) + lockKeyNs := fmt.Sprintf("%s-%s", lockKey, configYaml1.Name) + mockRedisClient.EXPECT(). - SetNX(lockKey, gomock.Any(), time.Duration(lockTimeoutMS)*time.Millisecond). + SetNX(lockKeyNs, gomock.Any(), time.Duration(lockTimeoutMS)*time.Millisecond). Return(redis.NewBoolResult(true, nil)) for _, svc := range svcs.Items { @@ -485,7 +487,7 @@ var _ = Describe("Scheduler Handler", func() { Query(gomock.Any(), "UPDATE schedulers SET (name, game, yaml, state, state_last_changed_at, last_scale_op_at) = (?name, ?game, ?yaml, ?state, ?state_last_changed_at, ?last_scale_op_at) WHERE id=?id", gomock.Any()) mockRedisClient.EXPECT(). - Eval(gomock.Any(), []string{lockKey}, gomock.Any()). + Eval(gomock.Any(), []string{lockKeyNs}, gomock.Any()). Return(redis.NewCmdResult(nil, nil)) recorder = httptest.NewRecorder() diff --git a/controller/controller.go b/controller/controller.go index 088085c71..fa58f9eeb 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -442,7 +442,7 @@ func UpdateSchedulerConfig( l.WithError(err).Error("error getting watcher lock") return err } else if lock == nil { - l.Warn("unable to get watcher lock, maybe some other process has it...") + l.Warnf("unable to get watcher %s lock, maybe some other process has it...", schedulerName) } } else if lock.IsLocked() { break waitForLock diff --git a/helm/charts/maestro/Chart.yaml b/helm/charts/maestro/Chart.yaml index 4c56dca24..c1edca9ae 100644 --- a/helm/charts/maestro/Chart.yaml +++ b/helm/charts/maestro/Chart.yaml @@ -1,7 +1,7 @@ name: maestro home: https://github.com/topfreegames/maestro description: Maestro api and worker -version: 2.15.1 +version: 2.15.2 maintainers: - name: TFGCo email: backend@tfgco.com diff --git a/metadata/version.go b/metadata/version.go index 8ec945740..0a3b92d61 100644 --- a/metadata/version.go +++ b/metadata/version.go @@ -8,7 +8,7 @@ package metadata //Version of Maestro -var Version = "2.15.1" +var Version = "2.15.2" //KubeVersion is the desired Kubernetes version var KubeVersion = "v1.6.4" diff --git a/testing/minikube.go b/testing/minikube.go index ffaf7b4e1..3c2b02de5 100644 --- a/testing/minikube.go +++ b/testing/minikube.go @@ -64,8 +64,8 @@ func NextJsonStr() (string, error) { { "name": "{{.Name}}", "game": "game-name", - "image": "nginx:alpine", - "toleration": "game-name", + "image": "nginx:alpine", + "toleration": "game-name", "ports": [ { "containerPort": 8080, @@ -81,6 +81,7 @@ func NextJsonStr() (string, error) { "memory": "10Mi", "cpu": "10m" }, + "occupiedTimeout": 300, "shutdownTimeout": 10, "autoscaling": { "min": 2, diff --git a/watcher/watcher.go b/watcher/watcher.go index d12f3e167..cbc3d84e1 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -8,6 +8,7 @@ package watcher import ( + "fmt" "os" "os/signal" "strings" @@ -89,7 +90,7 @@ func (w *Watcher) loadConfigurationDefaults() { func (w *Watcher) configure() { w.AutoScalingPeriod = w.Config.GetInt("watcher.autoScalingPeriod") - w.LockKey = w.Config.GetString("watcher.lockKey") + w.LockKey = fmt.Sprintf("%s-%s", w.Config.GetString("watcher.lockKey"), w.SchedulerName) w.LockTimeoutMS = w.Config.GetInt("watcher.lockTimeoutMs") var wg sync.WaitGroup w.gracefulShutdown = &gracefulShutdown{ @@ -141,7 +142,7 @@ func (w *Watcher) Start() { if err != nil { l.WithError(err).Error("error getting watcher lock") } else if lock == nil { - l.Warn("unable to get watcher lock, maybe some other process has it...") + l.Warnf("unable to get watcher %s lock, maybe some other process has it...", w.SchedulerName) } } else if lock.IsLocked() { w.RemoveDeadRooms() diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go index bd7f35fc3..4e0736e57 100644 --- a/watcher/watcher_test.go +++ b/watcher/watcher_test.go @@ -95,7 +95,7 @@ var _ = Describe("Watcher", func() { Expect(w.Logger).NotTo(BeNil()) Expect(w.MetricsReporter).To(Equal(mr)) Expect(w.RedisClient).To(Equal(redisClient)) - Expect(w.LockKey).To(Equal(lockKey)) + Expect(w.LockKey).To(Equal(fmt.Sprintf("%s-%s", lockKey, name))) Expect(w.LockTimeoutMS).To(Equal(lockTimeoutMs)) Expect(w.SchedulerName).To(Equal(name)) }) @@ -108,7 +108,7 @@ var _ = Describe("Watcher", func() { }) w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, name, occupiedTimeout) Expect(w.AutoScalingPeriod).To(Equal(10)) - Expect(w.LockKey).To(Equal("maestro-lock-key")) + Expect(w.LockKey).To(Equal("maestro-lock-key-my-scheduler")) Expect(w.LockTimeoutMS).To(Equal(180000)) }) }) @@ -133,7 +133,8 @@ var _ = Describe("Watcher", func() { Expect(w).NotTo(BeNil()) // EnterCriticalSection (lock done by redis-lock) - mockRedisClient.EXPECT().SetNX("maestro-lock-key", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(true, nil)).AnyTimes() + lockKey := fmt.Sprintf("maestro-lock-key-%s", configYaml1.Name) + mockRedisClient.EXPECT().SetNX(lockKey, gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(true, nil)).AnyTimes() // DeleteRoomsNoPingSince pKey := models.GetRoomPingRedisKey(configYaml1.Name) @@ -176,7 +177,7 @@ var _ = Describe("Watcher", func() { }).Return(&types.Result{}, nil) // LeaveCriticalSection (unlock done by redis-lock) - mockRedisClient.EXPECT().Eval(gomock.Any(), []string{"maestro-lock-key"}, gomock.Any()).Return(redis.NewCmdResult(nil, nil)).AnyTimes() + mockRedisClient.EXPECT().Eval(gomock.Any(), []string{lockKey}, gomock.Any()).Return(redis.NewCmdResult(nil, nil)).AnyTimes() w.Start() }) @@ -191,7 +192,7 @@ var _ = Describe("Watcher", func() { defer func() { w.Run = false }() // EnterCriticalSection (lock done by redis-lock) - mockRedisClient.EXPECT().SetNX("maestro-lock-key", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, errors.New("some error in lock"))).AnyTimes() + mockRedisClient.EXPECT().SetNX("maestro-lock-key-my-scheduler", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, errors.New("some error in lock"))).AnyTimes() Expect(func() { go w.Start() }).ShouldNot(Panic()) Eventually(func() bool { return w.Run }).Should(BeTrue()) @@ -210,12 +211,12 @@ var _ = Describe("Watcher", func() { defer func() { w.Run = false }() // EnterCriticalSection (lock done by redis-lock) - mockRedisClient.EXPECT().SetNX("maestro-lock-key", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, nil)).AnyTimes() + mockRedisClient.EXPECT().SetNX("maestro-lock-key-my-scheduler", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, nil)).AnyTimes() Expect(func() { go w.Start() }).ShouldNot(Panic()) Eventually(func() bool { return w.Run }).Should(BeTrue()) Eventually(func() bool { return hook.LastEntry() != nil }, 1500*time.Millisecond).Should(BeTrue()) - Expect(hook.LastEntry().Message).To(Equal("unable to get watcher lock, maybe some other process has it...")) + Expect(hook.LastEntry().Message).To(Equal("unable to get watcher my-scheduler lock, maybe some other process has it...")) }) }) diff --git a/worker/worker_int_test.go b/worker/worker_int_test.go index 74b4ce31b..8410e4201 100644 --- a/worker/worker_int_test.go +++ b/worker/worker_int_test.go @@ -20,6 +20,7 @@ import ( "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + uuid "github.com/satori/go.uuid" "github.com/topfreegames/maestro/models" mt "github.com/topfreegames/maestro/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -221,6 +222,48 @@ var _ = Describe("Worker", func() { }) It("should delete rooms that timed out with occupied status", func() { + jsonStr := fmt.Sprintf(`{ + "name": "%s", + "game": "game-name", + "image": "nginx:alpine", + "toleration": "game-name", + "ports": [ + { + "containerPort": 8080, + "protocol": "TCP", + "name": "tcp" + } + ], + "limits": { + "memory": "10Mi", + "cpu": "10m" + }, + "requests": { + "memory": "10Mi", + "cpu": "10m" + }, + "occupiedTimeout": 1, + "shutdownTimeout": 10, + "autoscaling": { + "min": 2, + "up": { + "delta": 1, + "trigger": { + "usage": 70, + "time": 1 + }, + "cooldown": 1 + }, + "down": { + "delta": 1, + "trigger": { + "usage": 50, + "time": 1 + }, + "cooldown": 1 + } + } +}`, fmt.Sprintf("maestro-test-%s", uuid.NewV4())) url = fmt.Sprintf("http://%s/scheduler", app.Address) request, err := http.NewRequest("POST", url, strings.NewReader(jsonStr)) request.Header.Add("Authorization", "Bearer token")