Skip to content

Commit

Permalink
Remove event forward on scheduler operations (create and update).
Browse files Browse the repository at this point in the history
  • Loading branch information
Camila de Andrade Scatolini committed Sep 17, 2019
1 parent 8fdf7ac commit 4026d9d
Show file tree
Hide file tree
Showing 6 changed files with 1 addition and 481 deletions.
33 changes: 0 additions & 33 deletions api/scheduler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/topfreegames/extensions/clock"
"github.com/topfreegames/extensions/middleware"
"github.com/topfreegames/maestro/controller"
"github.com/topfreegames/maestro/eventforwarder"
"github.com/topfreegames/maestro/models"
)

Expand Down Expand Up @@ -97,22 +96,6 @@ func (g *SchedulerCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
"name": payload.Name,
"game": payload.Game,
})

// this forwards the metadata configured for each enabled forwarder
_, err = eventforwarder.ForwardRoomInfo(
r.Context(),
g.App.Forwarders,
db,
kubernetesClient,
payload.Name,
g.App.SchedulerCache,
g.App.Logger,
)

if err != nil {
logger.WithError(err).Error("Room info forward failed.")
}

logger.Debug("Create scheduler succeeded.")
}
Write(w, http.StatusCreated, `{"success": true}`)
Expand Down Expand Up @@ -320,22 +303,6 @@ func updateSchedulerConfigCommon(

return status, description, err
}

// this forwards the metadata configured for each enabled forwarder
_, err = eventforwarder.ForwardRoomInfo(
ctx,
app.Forwarders,
db,
kubernetesClient,
configYaml.Name,
nil, // intentionally omit SchedulerCache to force reload since it is an update
app.Logger,
)

if err != nil {
logger.WithError(err).Error("Room info forward failed.")
}

logger.Info("update scheduler succeeded")
return http.StatusOK, "", nil
}
Expand Down
238 changes: 1 addition & 237 deletions api/scheduler_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/topfreegames/extensions/pg"
"github.com/topfreegames/maestro/api"
"github.com/topfreegames/maestro/controller"
"github.com/topfreegames/maestro/eventforwarder"
"github.com/topfreegames/maestro/login"
"github.com/topfreegames/maestro/models"
"k8s.io/api/core/v1"
Expand Down Expand Up @@ -111,10 +110,7 @@ var _ = Describe("Scheduler Handler", func() {
"forwarders": {
"mockplugin": {
"mockfwd": {
"enabled": true,
"medatada": {
"send": "me"
}
"enabled": true
}
}
}
Expand Down Expand Up @@ -545,63 +541,6 @@ autoscaling:
Expect(obj["success"]).To(Equal(false))
})
})

Context("with eventforwarders", func() {
BeforeEach(func() {
app.Forwarders = []*eventforwarder.Info{
&eventforwarder.Info{
Plugin: "mockplugin",
Name: "mockfwd",
Forwarder: mockEventForwarder1,
},
}
})

It("forwards scheduler event", func() {
mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient)
mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(10)
mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do(
func(schedulerName string, statusInfo map[string]interface{}) {
Expect(statusInfo["status"]).To(Equal(models.StatusCreating))
Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1))
},
).Times(10)
mockPipeline.EXPECT().ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).Times(10)
mockPipeline.EXPECT().SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).Times(10)
mockPipeline.EXPECT().Exec().Times(10)

MockLoadScheduler("scheduler-name", mockDb).Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = yamlString
scheduler.Game = "game-name"
})

mockEventForwarder1.EXPECT().Forward(gomock.Any(), "schedulerEvent", gomock.Any(), gomock.Any())

MockInsertScheduler(mockDb, nil)
MockUpdateScheduler(mockDb, nil, nil)
mockRedisClient.EXPECT().Get(models.GlobalPortsPoolKey).
Return(goredis.NewStringResult(workerPortRange, nil)).Times(10)

var configYaml1 models.ConfigYAML
err := yaml.Unmarshal([]byte(yamlString), &configYaml1)
Expect(err).NotTo(HaveOccurred())

err = MockSetScallingAmount(
mockRedisClient,
mockPipeline,
mockDb,
clientset,
&configYaml1,
0,
yamlString,
)
Expect(err).NotTo(HaveOccurred())

app.Router.ServeHTTP(recorder, request)
Expect(recorder.Code).To(Equal(201))
Expect(recorder.Body.String()).To(Equal(`{"success": true}`))
})
})
})

Describe("DELETE /scheduler/{schedulerName}", func() {
Expand Down Expand Up @@ -1304,181 +1243,6 @@ autoscaling:
Expect(obj["success"]).To(Equal(false))
})
})

Context("with eventforwarders", func() {
BeforeEach(func() {
app.Forwarders = []*eventforwarder.Info{
&eventforwarder.Info{
Plugin: "mockplugin",
Name: "mockfwd",
Forwarder: mockEventForwarder1,
},
&eventforwarder.Info{
Plugin: "mockplugin",
Name: "anothermockfwd",
Forwarder: mockEventForwarder2,
},
}
})

It("forwards scheduler event", func() {
// Create scheduler
yamlString1 := `
name: scheduler-name
game: game
image: image:v1
autoscaling:
min: 1
up:
delta: 10
trigger:
usage: 70
time: 600
cooldown: 300
down:
delta: 2
trigger:
usage: 50
time: 900
cooldown: 300
`
reader := strings.NewReader(yamlString1)
url := "/scheduler"
request, err := http.NewRequest("POST", url, reader)
Expect(err).NotTo(HaveOccurred())

var configYaml1 models.ConfigYAML
err = yaml.Unmarshal([]byte(yamlString1), &configYaml1)
Expect(err).NotTo(HaveOccurred())

mockRedisTraceWrapper.EXPECT().WithContext(gomock.Any(), mockRedisClient).Return(mockRedisClient).AnyTimes()
mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline).Times(configYaml1.AutoScaling.Min)
mockPipeline.EXPECT().HMSet(gomock.Any(), gomock.Any()).Do(
func(schedulerName string, statusInfo map[string]interface{}) {
Expect(statusInfo["status"]).To(Equal(models.StatusCreating))
Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1))
},
).Times(configYaml1.AutoScaling.Min)
mockPipeline.EXPECT().
ZAdd(models.GetRoomPingRedisKey("scheduler-name"), gomock.Any()).
Times(configYaml1.AutoScaling.Min)
mockPipeline.EXPECT().
SAdd(models.GetRoomStatusSetRedisKey("scheduler-name", "creating"), gomock.Any()).
Times(configYaml1.AutoScaling.Min)
mockPipeline.EXPECT().
Exec().
Times(configYaml1.AutoScaling.Min)

MockInsertScheduler(mockDb, nil)
MockUpdateScheduler(mockDb, nil, nil)

MockLoadScheduler("scheduler-name", mockDb).Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = yamlString1
scheduler.Game = "game"
})

err = MockSetScallingAmount(
mockRedisClient,
mockPipeline,
mockDb,
clientset,
&configYaml1,
0,
yamlString1,
)
Expect(err).NotTo(HaveOccurred())

app.Router.ServeHTTP(recorder, request)
Expect(recorder.Code).To(Equal(http.StatusCreated))
Expect(recorder.Body.String()).To(Equal(`{"success": true}`))

pods, err := clientset.CoreV1().Pods(configYaml1.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(configYaml1.AutoScaling.Min))

// Update scheduler
yamlString2 := `
name: scheduler-name
game: game
image: image:v1
autoscaling:
min: 1
up:
delta: 10
trigger:
usage: 70
time: 300
cooldown: 300
down:
delta: 2
trigger:
usage: 50
time: 900
cooldown: 300
forwarders:
mockplugin:
mockfwd:
enabled: true
metadata:
data: "to be forwarded"
intField: 123
anothermockfwd:
enabled: true
metadata:
data: "newData"
newInt: 987
`
var configYaml2 models.ConfigYAML
err = yaml.Unmarshal([]byte(yamlString2), &configYaml2)
Expect(err).NotTo(HaveOccurred())

reader = strings.NewReader(yamlString2)
url = fmt.Sprintf("/scheduler/%s", configYaml2.Name)
request, err = http.NewRequest("PUT", url, reader)
Expect(err).NotTo(HaveOccurred())

opManager = models.NewOperationManager(configYaml2.Name, mockRedisClient, logger)
MockOperationManager(opManager, timeoutDur, mockRedisClient, mockPipeline)

mockRedisClient.EXPECT().Ping().AnyTimes()

MockLoadScheduler(configYaml2.Name, mockDb).
Do(func(scheduler *models.Scheduler, query string, modifier string) {
*scheduler = *models.NewScheduler(configYaml1.Name, configYaml1.Game, yamlString1)
})

configLockKeyNs := models.GetSchedulerConfigLockKey(config.GetString("watcher.lockKey"), configYaml1.Name)
MockRedisLock(mockRedisClient, configLockKeyNs, lockTimeoutMs, true, nil)
MockReturnRedisLock(mockRedisClient, configLockKeyNs, nil)

MockUpdateVersionsTable(mockDb, nil)

// Set new operation manager description
MockAnySetDescription(opManager, mockRedisClient, models.OpManagerRunning, nil)

scheduler1 := models.NewScheduler(configYaml1.Name, configYaml1.Game, yamlString)
// Update new config on schedulers table
MockUpdateSchedulersTable(mockDb, nil)
// Add new version into versions table
scheduler1.NextMinorVersion()
MockInsertIntoVersionsTable(scheduler1, mockDb, nil)
// Count to delete old versions if necessary
MockCountNumberOfVersions(scheduler1, numberOfVersions, mockDb, nil)

MockLoadScheduler("scheduler-name", mockDb).Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = yamlString2
scheduler.Game = "game"
})

mockEventForwarder1.EXPECT().Forward(gomock.Any(), "schedulerEvent", gomock.Any(), gomock.Any())
mockEventForwarder2.EXPECT().Forward(gomock.Any(), "schedulerEvent", gomock.Any(), gomock.Any())

recorder = httptest.NewRecorder()
app.Router.ServeHTTP(recorder, request)
Expect(recorder.Body.String()).To(Equal(`{"success": true}`))
Expect(recorder.Code).To(Equal(http.StatusOK))
})
})
})

Describe("GET /scheduler/{schedulerName}", func() {
Expand Down
1 change: 0 additions & 1 deletion eventforwarder/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package eventforwarder
//Route{...} are the event forwarders routes names
const (
RoutePlayerEvent = "forwardPlayerEvent"
RouteRoomInfo = "forwardRoomInfo"
RouteRoomEvent = "forwardRoomEvent"
)

Expand Down
55 changes: 0 additions & 55 deletions eventforwarder/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,61 +144,6 @@ func ForwardRoomEvent(
return nil, nil
}

// ForwardRoomInfo forwards room info to app eventforwarders
func ForwardRoomInfo(
ctx context.Context,
forwarders []*Info,
db pginterfaces.DB,
kubernetesClient kubernetes.Interface,
schedulerName string,
schedulerCache *models.SchedulerCache,
logger logrus.FieldLogger,
) (res *Response, err error) {
var eventWasForwarded bool
startTime := time.Now()
defer func() {
reportRPCStatus(
eventWasForwarded,
schedulerName, RouteRoomInfo,
db,
schedulerCache,
logger,
err,
time.Now().Sub(startTime),
)
}()

l := logger.WithFields(logrus.Fields{
"op": "forwardRoomInfo",
"scheduler": schedulerName,
})
if len(forwarders) > 0 {
cachedScheduler, err := schedulerCache.LoadScheduler(db, schedulerName, true)
if err != nil {
return nil, err
}
infos := map[string]interface{}{
"game": cachedScheduler.Scheduler.Game,
}
l.WithFields(logrus.Fields{
"schedulerForwarders": len(cachedScheduler.ConfigYAML.Forwarders),
}).Debug("checking enabled forwarders")
if len(cachedScheduler.ConfigYAML.Forwarders) > 0 {
enabledForwarders := getEnabledForwarders(cachedScheduler.ConfigYAML.Forwarders, forwarders)
l.WithFields(logrus.Fields{
"schedulerForwarders": len(cachedScheduler.ConfigYAML.Forwarders),
"enabledForwarders": len(enabledForwarders),
}).Debug("got enabled forwarders")
if len(enabledForwarders) > 0 {
eventWasForwarded = true
return ForwardEventToForwarders(ctx, enabledForwarders, "schedulerEvent", infos, l)
}
}
}
l.Debug("no forwarders configured and enabled")
return nil, nil
}

// ForwardPlayerEvent forwards player event to app eventforwarders
func ForwardPlayerEvent(
ctx context.Context,
Expand Down
Loading

0 comments on commit 4026d9d

Please sign in to comment.