Skip to content

Commit

Permalink
One redis lock per scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Jun 21, 2017
1 parent b157547 commit 74db3c6
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 17 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ game: pong # several configs can refer to the same game
image: pong/pong:v123
affinity: node-affinity # optional field: if set, rooms will be allocated preferentially to nodes with label "node-affinity": "true"
toleration: node-toleration # optional field: if set, rooms will also be allocated in nodes with this taint
occupiedTimeout: match-time # how much time a match has. If room stays with occupied status for longer than occupiedTimeout seconds, the room is deleted
ports:
- containerPort: 5050 # port exposed in the container
protocol: UDP # supported protocols are TCP and UDP
Expand Down
3 changes: 2 additions & 1 deletion api/scheduler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (g *SchedulerUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
payload.OccupiedTimeout = g.App.Config.GetInt64("occupiedTimeout")
}
logger.WithField("time", time.Now()).Info("Starting update")
lockKey := fmt.Sprintf("%s-%s", g.App.Config.GetString("watcher.lockKey"), payload.Name)
err = mr.WithSegment(models.SegmentController, func() error {
return controller.UpdateSchedulerConfig(
l,
Expand All @@ -167,7 +168,7 @@ func (g *SchedulerUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
g.App.KubernetesClient,
payload,
timeoutSec, g.App.Config.GetInt("watcher.lockTimeoutMs"),
g.App.Config.GetString("watcher.lockKey"),
lockKey,
&clock.Clock{},
)
})
Expand Down
6 changes: 4 additions & 2 deletions api/scheduler_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,10 @@ var _ = Describe("Scheduler Handler", func() {
*scheduler = *models.NewScheduler(configYaml1.Name, configYaml1.Game, jsonString)
})

lockKeyNs := fmt.Sprintf("%s-%s", lockKey, configYaml1.Name)

mockRedisClient.EXPECT().
SetNX(lockKey, gomock.Any(), time.Duration(lockTimeoutMS)*time.Millisecond).
SetNX(lockKeyNs, gomock.Any(), time.Duration(lockTimeoutMS)*time.Millisecond).
Return(redis.NewBoolResult(true, nil))

for _, svc := range svcs.Items {
Expand Down Expand Up @@ -485,7 +487,7 @@ var _ = Describe("Scheduler Handler", func() {
Query(gomock.Any(), "UPDATE schedulers SET (name, game, yaml, state, state_last_changed_at, last_scale_op_at) = (?name, ?game, ?yaml, ?state, ?state_last_changed_at, ?last_scale_op_at) WHERE id=?id", gomock.Any())

mockRedisClient.EXPECT().
Eval(gomock.Any(), []string{lockKey}, gomock.Any()).
Eval(gomock.Any(), []string{lockKeyNs}, gomock.Any()).
Return(redis.NewCmdResult(nil, nil))

recorder = httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func UpdateSchedulerConfig(
l.WithError(err).Error("error getting watcher lock")
return err
} else if lock == nil {
l.Warn("unable to get watcher lock, maybe some other process has it...")
l.Warnf("unable to get watcher %s lock, maybe some other process has it...", schedulerName)
}
} else if lock.IsLocked() {
break waitForLock
Expand Down
2 changes: 1 addition & 1 deletion helm/charts/maestro/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: maestro
home: https://github.com/topfreegames/maestro
description: Maestro api and worker
version: 2.15.1
version: 2.15.2
maintainers:
- name: TFGCo
email: backend@tfgco.com
2 changes: 1 addition & 1 deletion metadata/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package metadata

//Version of Maestro
var Version = "2.15.1"
var Version = "2.15.2"

//KubeVersion is the desired Kubernetes version
var KubeVersion = "v1.6.4"
5 changes: 3 additions & 2 deletions testing/minikube.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func NextJsonStr() (string, error) {
{
"name": "{{.Name}}",
"game": "game-name",
"image": "nginx:alpine",
"toleration": "game-name",
"image": "nginx:alpine",
"toleration": "game-name",
"ports": [
{
"containerPort": 8080,
Expand All @@ -81,6 +81,7 @@ func NextJsonStr() (string, error) {
"memory": "10Mi",
"cpu": "10m"
},
"occupiedTimeout": 300,
"shutdownTimeout": 10,
"autoscaling": {
"min": 2,
Expand Down
5 changes: 3 additions & 2 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package watcher

import (
"fmt"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (w *Watcher) loadConfigurationDefaults() {

func (w *Watcher) configure() {
w.AutoScalingPeriod = w.Config.GetInt("watcher.autoScalingPeriod")
w.LockKey = w.Config.GetString("watcher.lockKey")
w.LockKey = fmt.Sprintf("%s-%s", w.Config.GetString("watcher.lockKey"), w.SchedulerName)
w.LockTimeoutMS = w.Config.GetInt("watcher.lockTimeoutMs")
var wg sync.WaitGroup
w.gracefulShutdown = &gracefulShutdown{
Expand Down Expand Up @@ -141,7 +142,7 @@ func (w *Watcher) Start() {
if err != nil {
l.WithError(err).Error("error getting watcher lock")
} else if lock == nil {
l.Warn("unable to get watcher lock, maybe some other process has it...")
l.Warnf("unable to get watcher %s lock, maybe some other process has it...", w.SchedulerName)
}
} else if lock.IsLocked() {
w.RemoveDeadRooms()
Expand Down
15 changes: 8 additions & 7 deletions watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var _ = Describe("Watcher", func() {
Expect(w.Logger).NotTo(BeNil())
Expect(w.MetricsReporter).To(Equal(mr))
Expect(w.RedisClient).To(Equal(redisClient))
Expect(w.LockKey).To(Equal(lockKey))
Expect(w.LockKey).To(Equal(fmt.Sprintf("%s-%s", lockKey, name)))
Expect(w.LockTimeoutMS).To(Equal(lockTimeoutMs))
Expect(w.SchedulerName).To(Equal(name))
})
Expand All @@ -108,7 +108,7 @@ var _ = Describe("Watcher", func() {
})
w := watcher.NewWatcher(config, logger, mr, mockDb, redisClient, clientset, name, occupiedTimeout)
Expect(w.AutoScalingPeriod).To(Equal(10))
Expect(w.LockKey).To(Equal("maestro-lock-key"))
Expect(w.LockKey).To(Equal("maestro-lock-key-my-scheduler"))
Expect(w.LockTimeoutMS).To(Equal(180000))
})
})
Expand All @@ -133,7 +133,8 @@ var _ = Describe("Watcher", func() {
Expect(w).NotTo(BeNil())

// EnterCriticalSection (lock done by redis-lock)
mockRedisClient.EXPECT().SetNX("maestro-lock-key", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(true, nil)).AnyTimes()
lockKey := fmt.Sprintf("maestro-lock-key-%s", configYaml1.Name)
mockRedisClient.EXPECT().SetNX(lockKey, gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(true, nil)).AnyTimes()

// DeleteRoomsNoPingSince
pKey := models.GetRoomPingRedisKey(configYaml1.Name)
Expand Down Expand Up @@ -176,7 +177,7 @@ var _ = Describe("Watcher", func() {
}).Return(&types.Result{}, nil)

// LeaveCriticalSection (unlock done by redis-lock)
mockRedisClient.EXPECT().Eval(gomock.Any(), []string{"maestro-lock-key"}, gomock.Any()).Return(redis.NewCmdResult(nil, nil)).AnyTimes()
mockRedisClient.EXPECT().Eval(gomock.Any(), []string{lockKey}, gomock.Any()).Return(redis.NewCmdResult(nil, nil)).AnyTimes()
w.Start()
})

Expand All @@ -191,7 +192,7 @@ var _ = Describe("Watcher", func() {
defer func() { w.Run = false }()

// EnterCriticalSection (lock done by redis-lock)
mockRedisClient.EXPECT().SetNX("maestro-lock-key", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, errors.New("some error in lock"))).AnyTimes()
mockRedisClient.EXPECT().SetNX("maestro-lock-key-my-scheduler", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, errors.New("some error in lock"))).AnyTimes()

Expect(func() { go w.Start() }).ShouldNot(Panic())
Eventually(func() bool { return w.Run }).Should(BeTrue())
Expand All @@ -210,12 +211,12 @@ var _ = Describe("Watcher", func() {
defer func() { w.Run = false }()

// EnterCriticalSection (lock done by redis-lock)
mockRedisClient.EXPECT().SetNX("maestro-lock-key", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, nil)).AnyTimes()
mockRedisClient.EXPECT().SetNX("maestro-lock-key-my-scheduler", gomock.Any(), gomock.Any()).Return(redis.NewBoolResult(false, nil)).AnyTimes()

Expect(func() { go w.Start() }).ShouldNot(Panic())
Eventually(func() bool { return w.Run }).Should(BeTrue())
Eventually(func() bool { return hook.LastEntry() != nil }, 1500*time.Millisecond).Should(BeTrue())
Expect(hook.LastEntry().Message).To(Equal("unable to get watcher lock, maybe some other process has it..."))
Expect(hook.LastEntry().Message).To(Equal("unable to get watcher my-scheduler lock, maybe some other process has it..."))
})
})

Expand Down
43 changes: 43 additions & 0 deletions worker/worker_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
uuid "github.com/satori/go.uuid"
"github.com/topfreegames/maestro/models"
mt "github.com/topfreegames/maestro/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -221,6 +222,48 @@ var _ = Describe("Worker", func() {
})

It("should delete rooms that timed out with occupied status", func() {
jsonStr := fmt.Sprintf(`{
"name": "%s",
"game": "game-name",
"image": "nginx:alpine",
"toleration": "game-name",
"ports": [
{
"containerPort": 8080,
"protocol": "TCP",
"name": "tcp"
}
],
"limits": {
"memory": "10Mi",
"cpu": "10m"
},
"requests": {
"memory": "10Mi",
"cpu": "10m"
},
"occupiedTimeout": 1,
"shutdownTimeout": 10,
"autoscaling": {
"min": 2,
"up": {
"delta": 1,
"trigger": {
"usage": 70,
"time": 1
},
"cooldown": 1
},
"down": {
"delta": 1,
"trigger": {
"usage": 50,
"time": 1
},
"cooldown": 1
}
}
}`, fmt.Sprintf("maestro-test-%s", uuid.NewV4()))
url = fmt.Sprintf("http://%s/scheduler", app.Address)
request, err := http.NewRequest("POST", url, strings.NewReader(jsonStr))
request.Header.Add("Authorization", "Bearer token")
Expand Down

0 comments on commit 74db3c6

Please sign in to comment.